blob: 2342dddf588cb7773ac2a04cd82059f6d4814c85 [file] [log] [blame]
Jiri Simsad7616c92015-03-24 23:44:30 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Jiri Simsa5293dcb2014-05-10 09:56:38 -07005package vif
6
7// Logging guidelines:
8// vlog.VI(1) for per-net.Conn information
9// vlog.VI(2) for per-VC information
10// vlog.VI(3) for per-Flow information
11
12import (
13 "bytes"
14 "fmt"
15 "net"
Robin Thellend5bd72422015-02-17 12:36:38 -080016 "sort"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070017 "strings"
18 "sync"
Jungho Ahncd175b82015-03-27 14:29:40 -070019 "time"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070020
Ankur50a5f392015-02-27 18:46:30 -080021 "v.io/v23/context"
Jiri Simsa6ac95222015-02-23 16:11:49 -080022 "v.io/v23/naming"
Ankur50a5f392015-02-27 18:46:30 -080023 "v.io/v23/security"
Jiri Simsa6ac95222015-02-23 16:11:49 -080024 "v.io/v23/verror"
Jiri Simsa6ac95222015-02-23 16:11:49 -080025 "v.io/v23/vtrace"
Jungho Ahncd175b82015-03-27 14:29:40 -070026
Jiri Simsa337af232015-02-27 14:36:46 -080027 "v.io/x/lib/vlog"
Matt Rosencrantzdbc1be22015-02-28 15:15:49 -080028 "v.io/x/ref/profiles/internal/lib/bqueue"
29 "v.io/x/ref/profiles/internal/lib/bqueue/drrqueue"
30 "v.io/x/ref/profiles/internal/lib/iobuf"
31 "v.io/x/ref/profiles/internal/lib/pcqueue"
Matt Rosencrantz86ba1a12015-03-09 13:19:02 -070032 vsync "v.io/x/ref/profiles/internal/lib/sync"
33 "v.io/x/ref/profiles/internal/lib/upcqueue"
Matt Rosencrantzc16339c2015-04-23 10:47:06 -070034 inaming "v.io/x/ref/profiles/internal/naming"
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070035 "v.io/x/ref/profiles/internal/rpc/stream"
36 "v.io/x/ref/profiles/internal/rpc/stream/crypto"
37 "v.io/x/ref/profiles/internal/rpc/stream/id"
38 "v.io/x/ref/profiles/internal/rpc/stream/message"
39 "v.io/x/ref/profiles/internal/rpc/stream/vc"
Matt Rosencrantz0e207172015-04-16 14:58:02 -070040 iversion "v.io/x/ref/profiles/internal/rpc/version"
Jiri Simsa5293dcb2014-05-10 09:56:38 -070041)
42
Matt Rosencrantz94502cf2015-03-18 09:43:44 -070043const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif"
Mike Burrows35baad92015-02-10 13:42:53 -080044
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -070045func reg(id, msg string) verror.IDAction {
46 return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg)
47}
48
Mike Burrows35baad92015-02-10 13:42:53 -080049var (
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -070050 // These errors are intended to be used as arguments to higher
51 // level errors and hence {1}{2} is omitted from their format
52 // strings to avoid repeating these n-times in the final error
53 // message visible to the user.
54 errShuttingDown = reg(".errShuttingDown", "underlying network connection({3}) shutting down")
55 errVCHandshakeFailed = reg(".errVCHandshakeFailed", "VC handshake failed{:3}")
56 errSendOnExpressQFailed = reg(".errSendOnExpressQFailed", "vif.sendOnExpressQ(OpenVC) failed{:3}")
57 errVIFIsBeingClosed = reg(".errVIFIsBeingClosed", "VIF is being closed")
58 errVIFAlreadyAcceptingFlows = reg(".errVIFAlreadyAcceptingFlows", "already accepting flows on VIF {3}")
59 errVCsNotAcceptedOnVIF = reg(".errVCsNotAcceptedOnVIF", "VCs not accepted on VIF {3}")
60 errAcceptFailed = reg(".errAcceptFailed", "Accept failed{:3}")
61 errRemoteEndClosedVC = reg(".errRemoteEndClosedVC", "remote end closed VC{:3}")
62 errFlowsNoLongerAccepted = reg(".errFlowsNowLongerAccepted", "Flows no longer being accepted")
63 errVCAcceptFailed = reg(".errVCAcceptFailed", "VC accept failed{:3}")
64 errIdleTimeout = reg(".errIdleTimeout", "idle timeout")
65 errVIFAlreadySetup = reg(".errVIFAlreadySetupt", "VIF is already setup")
66 errBqueueWriterForXpress = reg(".errBqueueWriterForXpress", "failed to create bqueue.Writer for express messages{:3}")
67 errBqueueWriterForControl = reg(".errBqueueWriterForControl", "failed to create bqueue.Writer for flow control counters{:3}")
68 errBqueueWriterForStopping = reg(".errBqueueWriterForStopping", "failed to create bqueue.Writer for stopping the write loop{:3}")
69 errWriteFailed = reg(".errWriteFailed", "write failed: got ({3}, {4}) for {5} byte message)")
Mike Burrows35baad92015-02-10 13:42:53 -080070)
71
Jiri Simsa5293dcb2014-05-10 09:56:38 -070072// VIF implements a "virtual interface" over an underlying network connection
73// (net.Conn). Just like multiple network connections can be established over a
74// single physical interface, multiple Virtual Circuits (VCs) can be
75// established over a single VIF.
76type VIF struct {
Jungho Ahn4b9a5192015-02-02 13:11:08 -080077 // All reads must be performed through reader, and not directly through conn.
Jiri Simsa5293dcb2014-05-10 09:56:38 -070078 conn net.Conn
Jiri Simsa5293dcb2014-05-10 09:56:38 -070079 pool *iobuf.Pool
Jungho Ahn4b9a5192015-02-02 13:11:08 -080080 reader *iobuf.Reader
Jiri Simsa5293dcb2014-05-10 09:56:38 -070081 localEP naming.Endpoint
82
Jason Hickey96d30e82014-11-13 07:40:00 -080083 // ctrlCipher is normally guarded by writeMu, however see the exception in
84 // readLoop.
85 ctrlCipher crypto.ControlCipher
86 writeMu sync.Mutex
87
Jungho Ahn6ab655f2015-04-14 18:27:09 -070088 muStartTimer sync.Mutex
89 startTimer timer
90
Asim Shankarf5781102014-06-26 22:05:35 -070091 vcMap *vcMap
Jungho Ahncd175b82015-03-27 14:29:40 -070092 idleTimerMap *idleTimerMap
Tilak Sharma6d7c39c2014-06-27 10:05:37 -070093 wpending, rpending vsync.WaitGroup
Jiri Simsa5293dcb2014-05-10 09:56:38 -070094
95 muListen sync.Mutex
96 acceptor *upcqueue.T // GUARDED_BY(muListen)
97 listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen)
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -070098 principal security.Principal
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -070099 blessings security.Blessings
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700100
101 muNextVCI sync.Mutex
102 nextVCI id.VC
103
104 outgoing bqueue.T
105 expressQ bqueue.Writer
106
107 flowQ bqueue.Writer
108 flowMu sync.Mutex
109 flowCounters message.Counters
110
111 stopQ bqueue.Writer
112
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700113 // The RPC version range supported by this VIF. In practice this is
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700114 // non-nil only in testing. nil is equivalent to using the versions
Matt Rosencrantz94502cf2015-03-18 09:43:44 -0700115 // actually supported by this RPC implementation (which is always
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700116 // what you want outside of tests).
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700117 versions *iversion.Range
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800118
119 isClosedMu sync.Mutex
120 isClosed bool // GUARDED_BY(isClosedMu)
Jungho Ahncd175b82015-03-27 14:29:40 -0700121 onClose func(*VIF)
Robin Thellend2224ffa2015-02-14 21:28:27 -0800122
123 // All sets that this VIF is in.
124 muSets sync.Mutex
125 sets []*Set // GUARDED_BY(muSets)
Robin Thellend5bd72422015-02-17 12:36:38 -0800126
127 // These counters track the number of messages sent and received by
128 // this VIF.
129 muMsgCounters sync.Mutex
130 msgCounters map[string]int64
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700131}
132
133// ConnectorAndFlow represents a Flow and the Connector that can be used to
134// create another Flow over the same underlying VC.
135type ConnectorAndFlow struct {
136 Connector stream.Connector
137 Flow stream.Flow
138}
139
140// Separate out constants that are not exported so that godoc looks nicer for
141// the exported ones.
142const (
143 // Priorities of the buffered queues used for flow control of writes.
144 expressPriority bqueue.Priority = iota
Jungho Ahn60408fa2015-03-27 15:28:22 -0700145 controlPriority
146 // The range of flow priorities is [flowPriority, flowPriority + NumFlowPriorities)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700147 flowPriority
Jungho Ahn60408fa2015-03-27 15:28:22 -0700148 stopPriority = flowPriority + vc.NumFlowPriorities
149)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700150
Jungho Ahn60408fa2015-03-27 15:28:22 -0700151const (
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700152 // Convenience aliases so that the package name "vc" does not
153 // conflict with the variables named "vc".
154 defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow
155 sharedFlowID = vc.SharedFlowID
156)
157
158// InternalNewDialedVIF creates a new virtual interface over the provided
159// network connection, under the assumption that the conn object was created
Jungho Ahncd175b82015-03-27 14:29:40 -0700160// using net.Dial. If onClose is given, it is run in its own goroutine when
161// the vif has been closed.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700162//
163// As the name suggests, this method is intended for use only within packages
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -0700164// placed inside v.io/x/ref/profiles/internal. Code outside the
165// v.io/x/ref/profiles/internal/* packages should never call this method.
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700166func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, versions *iversion.Range, onClose func(*VIF), opts ...stream.VCOpt) (*VIF, error) {
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700167 ctx := getDialContext(opts)
Asim Shankarf4864f42014-11-25 18:53:05 -0800168 if ctx != nil {
169 var span vtrace.Span
Todd Wangad492042015-04-17 15:58:40 -0700170 ctx, span = vtrace.WithNewSpan(ctx, "InternalNewDialedVIF")
Asim Shankarf4864f42014-11-25 18:53:05 -0800171 span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr())
172 defer span.Finish()
173 }
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800174 pool := iobuf.NewPool(0)
175 reader := iobuf.NewReader(pool, conn)
Matt Rosencrantz5c7ed212015-02-27 22:42:35 -0800176 params := security.CallParams{LocalPrincipal: principal, LocalEndpoint: localEP(conn, rid, versions)}
Ankur50a5f392015-02-27 18:46:30 -0800177
178 // TODO(ataly, ashankar, suharshs): Figure out what authorization policy to use
179 // for authenticating the server during VIF establishment. Note that we cannot
180 // use the VC.ServerAuthorizer available in 'opts' as that applies to the end
181 // server and not the remote endpoint of the VIF.
182 c, err := AuthenticateAsClient(conn, reader, versions, params, nil)
Jason Hickey96d30e82014-11-13 07:40:00 -0800183 if err != nil {
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700184 return nil, verror.New(stream.ErrNetwork, ctx, err)
Jason Hickey96d30e82014-11-13 07:40:00 -0800185 }
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700186 var blessings security.Blessings
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700187
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700188 if principal != nil {
189 blessings = principal.BlessingStore().Default()
190 }
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700191 var startTimeout time.Duration
192 for _, o := range opts {
193 switch v := o.(type) {
194 case vc.StartTimeout:
195 startTimeout = v.Duration
196 }
197 }
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700198 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700199}
200
201// InternalNewAcceptedVIF creates a new virtual interface over the provided
202// network connection, under the assumption that the conn object was created
Jungho Ahncd175b82015-03-27 14:29:40 -0700203// using an Accept call on a net.Listener object. If onClose is given, it is
204// run in its own goroutine when the vif has been closed.
Asim Shankar3d133872014-05-16 23:16:31 -0700205//
206// The returned VIF is also setup for accepting new VCs and Flows with the provided
207// ListenerOpts.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700208//
209// As the name suggests, this method is intended for use only within packages
Suharsh Sivakumar8646ba62015-03-18 15:22:28 -0700210// placed inside v.io/x/ref/profiles/internal. Code outside the
211// v.io/x/ref/profiles/internal/* packages should never call this method.
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700212func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800213 pool := iobuf.NewPool(0)
214 reader := iobuf.NewReader(pool, conn)
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700215
216 dischargeClient := getDischargeClient(lopts)
217
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700218 c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient)
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700219 if err != nil {
220 return nil, err
221 }
222
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700223 var startTimeout time.Duration
224 for _, o := range lopts {
225 switch v := o.(type) {
226 case vc.StartTimeout:
227 startTimeout = v.Duration
228 }
229 }
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700230 return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700231}
232
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700233func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700234 var (
235 // Choose IDs that will not conflict with any other (VC, Flow)
236 // pairs. VCI 0 is never used by the application (it is
237 // reserved for control messages), so steal from the Flow space
238 // there.
239 expressID bqueue.ID = packIDs(0, 0)
240 flowID bqueue.ID = packIDs(0, 1)
241 stopID bqueue.ID = packIDs(0, 2)
242 )
243 outgoing := drrqueue.New(vc.MaxPayloadSizeBytes)
244
245 expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow)
246 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700247 return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForXpress, nil, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700248 }
249 expressQ.Release(-1) // Disable flow control
250
Jungho Ahn60408fa2015-03-27 15:28:22 -0700251 flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700252 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700253 return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForControl, nil, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700254 }
255 flowQ.Release(-1) // Disable flow control
256
257 stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1)
258 if err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700259 return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForStopping, nil, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700260 }
261 stopQ.Release(-1) // Disable flow control
262
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700263 if versions == nil {
264 versions = iversion.SupportedRange
265 }
266
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700267 vif := &VIF{
268 conn: conn,
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800269 pool: pool,
270 reader: reader,
Jason Hickey96d30e82014-11-13 07:40:00 -0800271 ctrlCipher: c,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700272 vcMap: newVCMap(),
Asim Shankar3d133872014-05-16 23:16:31 -0700273 acceptor: acceptor,
274 listenerOpts: listenerOpts,
Suharsh Sivakumar59c423c2015-03-11 14:06:03 -0700275 principal: principal,
Ankur50a5f392015-02-27 18:46:30 -0800276 localEP: localEP(conn, rid, versions),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700277 nextVCI: initialVCI,
278 outgoing: outgoing,
279 expressQ: expressQ,
280 flowQ: flowQ,
281 flowCounters: message.NewCounters(),
282 stopQ: stopQ,
283 versions: versions,
Jungho Ahncd175b82015-03-27 14:29:40 -0700284 onClose: onClose,
Robin Thellend5bd72422015-02-17 12:36:38 -0800285 msgCounters: make(map[string]int64),
Suharsh Sivakumare5e5dcc2015-03-18 14:29:31 -0700286 blessings: blessings,
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700287 }
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700288 if startTimeout > 0 {
289 vif.startTimer = newTimer(startTimeout, vif.Close)
290 }
Jungho Ahncd175b82015-03-27 14:29:40 -0700291 vif.idleTimerMap = newIdleTimerMap(func(vci id.VC) {
292 vc, _, _ := vif.vcMap.Find(vci)
293 if vc != nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700294 vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil))
Jungho Ahncd175b82015-03-27 14:29:40 -0700295 }
296 })
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700297 go vif.readLoop()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700298 go vif.writeLoop()
299 return vif, nil
300}
301
302// Dial creates a new VC to the provided remote identity, authenticating the VC
303// with the provided local identity.
Suharsh Sivakumar2ad4e102015-03-17 21:23:37 -0700304func (vif *VIF) Dial(remoteEP naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.VC, error) {
Jungho Ahncd175b82015-03-27 14:29:40 -0700305 var idleTimeout time.Duration
306 for _, o := range opts {
307 switch v := o.(type) {
308 case vc.IdleTimeout:
309 idleTimeout = v.Duration
310 }
311 }
312 vc, err := vif.newVC(vif.allocVCI(), vif.localEP, remoteEP, idleTimeout, true)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700313 if err != nil {
314 return nil, err
315 }
316 counters := message.NewCounters()
317 counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow)
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700318
Matt Rosencrantzc16339c2015-04-23 10:47:06 -0700319 sendPublicKey := func(pubKey *crypto.BoxKey) error {
320 var options []message.SetupOption
321 if pubKey != nil {
322 options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700323 }
Matt Rosencrantzc16339c2015-04-23 10:47:06 -0700324 err := vif.sendOnExpressQ(&message.SetupVC{
325 VCI: vc.VCI(),
326 RemoteEndpoint: remoteEP,
327 LocalEndpoint: vif.localEP,
328 Counters: counters,
329 Setup: message.Setup{
330 Versions: *vif.versions,
331 Options: options,
332 },
333 })
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700334 if err != nil {
Matt Rosencrantzc16339c2015-04-23 10:47:06 -0700335 err = verror.New(stream.ErrNetwork, nil,
336 verror.New(errSendOnExpressQFailed, nil, err))
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700337 }
Matt Rosencrantzc16339c2015-04-23 10:47:06 -0700338 return err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700339 }
Matt Rosencrantzc16339c2015-04-23 10:47:06 -0700340 if err = vc.HandshakeDialedVC(principal, sendPublicKey, opts...); err != nil {
Jungho Ahncd175b82015-03-27 14:29:40 -0700341 vif.deleteVC(vc.VCI())
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700342 vc.Close(err)
343 return nil, err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700344 }
345 return vc, nil
346}
347
Robin Thellend2224ffa2015-02-14 21:28:27 -0800348// addSet adds a set to the list of sets this VIF is in. This method is called
349// by Set.Insert().
350func (vif *VIF) addSet(s *Set) {
351 vif.muSets.Lock()
352 defer vif.muSets.Unlock()
353 vif.sets = append(vif.sets, s)
354}
355
356// removeSet removes a set from the list of sets this VIF is in. This method is
357// called by Set.Delete().
358func (vif *VIF) removeSet(s *Set) {
359 vif.muSets.Lock()
360 defer vif.muSets.Unlock()
361 for ix, vs := range vif.sets {
362 if vs == s {
363 vif.sets = append(vif.sets[:ix], vif.sets[ix+1:]...)
364 return
365 }
366 }
Robin Thellend2224ffa2015-02-14 21:28:27 -0800367}
368
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700369// Close closes all VCs (and thereby Flows) over the VIF and then closes the
370// underlying network connection after draining all pending writes on those
371// VCs.
372func (vif *VIF) Close() {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800373 vif.isClosedMu.Lock()
374 if vif.isClosed {
375 vif.isClosedMu.Unlock()
376 return
377 }
378 vif.isClosed = true
379 vif.isClosedMu.Unlock()
380
Robin Thellend2224ffa2015-02-14 21:28:27 -0800381 vif.muSets.Lock()
382 sets := vif.sets
383 vif.sets = nil
384 vif.muSets.Unlock()
385 for _, s := range sets {
386 s.Delete(vif)
387 }
388
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700389 vlog.VI(1).Infof("Closing VIF %s", vif)
390 // Stop accepting new VCs.
391 vif.StopAccepting()
392 // Close local datastructures for all existing VCs.
393 vcs := vif.vcMap.Freeze()
Jungho Ahncd175b82015-03-27 14:29:40 -0700394 // Stop the idle timers.
395 vif.idleTimerMap.Stop()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700396 for _, vc := range vcs {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700397 vc.VC.Close(verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil)))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700398 }
399 // Wait for the vcWriteLoops to exit (after draining queued up messages).
400 vif.stopQ.Close()
Asim Shankarf5781102014-06-26 22:05:35 -0700401 vif.wpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700402 // Close the underlying network connection.
403 // No need to send individual messages to close all pending VCs since
404 // the remote end should know to close all VCs when the VIF's
405 // connection breaks.
406 if err := vif.conn.Close(); err != nil {
407 vlog.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err)
408 }
Jungho Ahncd175b82015-03-27 14:29:40 -0700409 // Notify that the VIF has been closed.
410 if vif.onClose != nil {
411 go vif.onClose(vif)
412 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700413}
414
415// StartAccepting begins accepting Flows (and VCs) initiated by the remote end
416// of a VIF. opts is used to setup the listener on newly established VCs.
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700417func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error {
418 vif.muListen.Lock()
419 defer vif.muListen.Unlock()
420 if vif.acceptor != nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700421 return verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil, vif))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700422 }
423 vif.acceptor = upcqueue.New()
424 vif.listenerOpts = opts
425 return nil
426}
427
428// StopAccepting prevents any Flows initiated by the remote end of a VIF from
429// being accepted and causes any existing and future calls to Accept to fail
430// immediately.
431func (vif *VIF) StopAccepting() {
432 vif.muListen.Lock()
433 defer vif.muListen.Unlock()
434 if vif.acceptor != nil {
435 vif.acceptor.Shutdown()
436 vif.acceptor = nil
437 vif.listenerOpts = nil
438 }
439}
440
441// Accept returns the (stream.Connector, stream.Flow) pair of a newly
442// established VC and/or Flow.
443//
444// Sample usage:
445// for {
446// cAndf, err := vif.Accept()
447// switch {
448// case err != nil:
449// fmt.Println("Accept error:", err)
450// return
451// case cAndf.Flow == nil:
452// fmt.Println("New VC established:", cAndf.Connector)
453// default:
454// fmt.Println("New flow established")
455// go handleFlow(cAndf.Flow)
456// }
457// }
458func (vif *VIF) Accept() (ConnectorAndFlow, error) {
459 vif.muListen.Lock()
460 acceptor := vif.acceptor
461 vif.muListen.Unlock()
462 if acceptor == nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700463 return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errVCsNotAcceptedOnVIF, nil, vif))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700464 }
465 item, err := acceptor.Get(nil)
466 if err != nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700467 return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700468 }
469 return item.(ConnectorAndFlow), nil
470}
471
472func (vif *VIF) String() string {
473 l := vif.conn.LocalAddr()
474 r := vif.conn.RemoteAddr()
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700475 return fmt.Sprintf("(%s, %s) <-> (%s, %s)", l.Network(), l, r.Network(), r)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700476}
477
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700478func (vif *VIF) readLoop() {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700479 defer vif.Close()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700480 defer vif.stopVCDispatchLoops()
481 for {
Jason Hickey96d30e82014-11-13 07:40:00 -0800482 // vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation
483 // to it is in handleMessage, which runs in the same goroutine, so a
484 // lock is not required here.
Matt Rosencrantz6902de22015-04-24 13:02:32 -0700485 msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher)
486 if err != nil {
487 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err)
488 return
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700489 }
490 vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800491 if err := vif.handleMessage(msg); err != nil {
492 vlog.VI(1).Infof("Exiting readLoop of VIF %s because of message error: %v", vif, err)
493 return
494 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700495 }
496}
497
Jason Hickey96d30e82014-11-13 07:40:00 -0800498// handleMessage handles a single incoming message. Any error returned is
499// fatal, causing the VIF to close.
500func (vif *VIF) handleMessage(msg message.T) error {
Robin Thellend5bd72422015-02-17 12:36:38 -0800501 vif.muMsgCounters.Lock()
502 vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++
503 vif.muMsgCounters.Unlock()
504
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700505 switch m := msg.(type) {
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700506
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700507 case *message.Data:
508 _, rq, _ := vif.vcMap.Find(m.VCI)
509 if rq == nil {
510 vlog.VI(2).Infof("Ignoring message of %d bytes for unrecognized VCI %d on VIF %s", m.Payload.Size(), m.VCI, vif)
511 m.Release()
Jason Hickey96d30e82014-11-13 07:40:00 -0800512 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700513 }
514 if err := rq.Put(m, nil); err != nil {
515 vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err)
516 m.Release()
517 }
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700518
Asim Shankarcd612e12015-02-24 15:29:52 -0800519 case *message.SetupVC:
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700520 // First, find the public key we need out of the message.
521 var theirPK *crypto.BoxKey
522 box := m.Setup.NaclBox()
523 if box != nil {
524 theirPK = &box.PublicKey
525 }
526
527 // If we dialed this VC, then this is a response and we should finish
528 // the vc handshake. Otherwise, this message is opening a new VC.
529 if vif.dialedVCI(m.VCI) {
530 vif.distributeCounters(m.Counters)
531 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
532 intersection, err := vif.versions.Intersect(&m.Setup.Versions)
533 if err != nil {
534 vif.closeVCAndSendMsg(vc, false, err)
535 } else if err := vc.FinishHandshakeDialedVC(intersection.Max, theirPK); err != nil {
536 vif.closeVCAndSendMsg(vc, false, err)
537 }
538 return nil
539 }
540 vlog.VI(2).Infof("Ignoring SetupVC message %+v for unknown dialed VC", m)
541 return nil
542 }
543
544 // This is an accepted VC.
545 intersection, err := vif.versions.Intersect(&m.Setup.Versions)
546 if err != nil {
547 vlog.VI(2).Infof("SetupVC message %+v to VIF %s did not present compatible versions: %v", m, vif, err)
548 vif.sendOnExpressQ(&message.CloseVC{
549 VCI: m.VCI,
550 Error: err.Error(),
551 })
552 return nil
553 }
554 vif.muListen.Lock()
555 closed := vif.acceptor == nil || vif.acceptor.IsClosed()
556 lopts := vif.listenerOpts
557 vif.muListen.Unlock()
558 if closed {
559 vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif)
560 vif.sendOnExpressQ(&message.CloseVC{
561 VCI: m.VCI,
562 Error: "VCs not accepted",
563 })
564 return nil
565 }
566 var idleTimeout time.Duration
567 for _, o := range lopts {
568 switch v := o.(type) {
569 case vc.IdleTimeout:
570 idleTimeout = v.Duration
571 }
572 }
573 vc, err := vif.newVC(m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false)
574 if err != nil {
575 vif.sendOnExpressQ(&message.CloseVC{
576 VCI: m.VCI,
577 Error: err.Error(),
578 })
579 return nil
580 }
Asim Shankarcd612e12015-02-24 15:29:52 -0800581 vif.distributeCounters(m.Counters)
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700582 keyExchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) {
583 var options []message.SetupOption
584 if pubKey != nil {
585 options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}}
586 }
587 err = vif.sendOnExpressQ(&message.SetupVC{
588 VCI: m.VCI,
589 Setup: message.Setup{
590 // Note that servers send clients not their actual supported versions,
591 // but the intersected range of the server and client ranges. This
592 // is important because proxies may have adjusted the version ranges
593 // along the way, and we should negotiate a version that is compatible
594 // with all intermediate hops.
595 Versions: *intersection,
596 Options: options,
597 },
598 RemoteEndpoint: m.LocalEndpoint,
599 LocalEndpoint: vif.localEP,
600 // TODO(mattr): Consider adding counters. See associated comment
601 // in vc.go:VC.HandshakeAcceptedVC for more details.
602 })
603 return theirPK, err
604 }
605 go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(intersection.Max, vif.principal, vif.blessings, keyExchanger, lopts...))
606
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700607 case *message.CloseVC:
608 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
Jungho Ahncd175b82015-03-27 14:29:40 -0700609 vif.deleteVC(vc.VCI())
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700610 vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
Cosmos Nicolaou1534b3f2014-12-10 15:30:00 -0800611 // TODO(cnicolaou): it would be nice to have a method on VC
612 // to indicate a 'remote close' rather than a 'local one'. This helps
613 // with error reporting since we expect reads/writes to occur
614 // after a remote close, but not after a local close.
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700615 vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error)))
Jason Hickey96d30e82014-11-13 07:40:00 -0800616 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700617 }
618 vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif)
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700619
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700620 case *message.AddReceiveBuffers:
621 vif.distributeCounters(m.Counters)
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700622
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700623 case *message.OpenFlow:
624 if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
625 if err := vc.AcceptFlow(m.Flow); err != nil {
626 vlog.VI(3).Infof("OpenFlow %+v on VIF %v failed:%v", m, vif, err)
627 cm := &message.Data{VCI: m.VCI, Flow: m.Flow}
628 cm.SetClose()
629 vif.sendOnExpressQ(cm)
Jason Hickey96d30e82014-11-13 07:40:00 -0800630 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700631 }
632 vc.ReleaseCounters(m.Flow, m.InitialCounters)
Jason Hickey96d30e82014-11-13 07:40:00 -0800633 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700634 }
635 vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif)
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -0700636
Matt Rosencrantz1ca47182015-04-22 17:13:28 -0700637 case *message.Setup:
638 vlog.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif)
639
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700640 default:
641 vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif)
642 }
Jason Hickey96d30e82014-11-13 07:40:00 -0800643 return nil
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700644}
645
646func (vif *VIF) vcDispatchLoop(vc *vc.VC, messages *pcqueue.T) {
647 defer vlog.VI(2).Infof("Exiting vcDispatchLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700648 defer vif.rpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700649 for {
650 qm, err := messages.Get(nil)
651 if err != nil {
652 return
653 }
654 m := qm.(*message.Data)
655 if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil {
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700656 vlog.VI(2).Infof("Ignoring data message %v for on VIF %s: %v", m, vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700657 }
658 if m.Close() {
659 vif.shutdownFlow(vc, m.Flow)
660 }
661 }
662}
663
664func (vif *VIF) stopVCDispatchLoops() {
665 vcs := vif.vcMap.Freeze()
666 for _, v := range vcs {
667 v.RQ.Close()
668 }
Asim Shankarf5781102014-06-26 22:05:35 -0700669 vif.rpending.Wait()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700670}
671
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700672func clientVCClosed(err error) bool {
673 // If we've encountered a networking error, then all likelihood the
674 // connection to the client is closed.
675 return verror.ErrorID(err) == stream.ErrNetwork.ID
676}
677
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700678func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) {
679 hr := <-c
680 if hr.Error != nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700681 vif.closeVCAndSendMsg(vc, clientVCClosed(hr.Error), hr.Error)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700682 return
683 }
684
685 vif.muListen.Lock()
686 acceptor := vif.acceptor
687 vif.muListen.Unlock()
688 if acceptor == nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700689 vif.closeVCAndSendMsg(vc, false, verror.New(errFlowsNoLongerAccepted, nil))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700690 return
691 }
692
693 // Notify any listeners that a new VC has been established
694 if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil {
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700695 vif.closeVCAndSendMsg(vc, clientVCClosed(err), verror.New(errVCAcceptFailed, nil, err))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700696 return
697 }
698
699 vlog.VI(2).Infof("Running acceptFlowsLoop for VC %v on VIF %v", vc, vif)
700 for {
701 f, err := hr.Listener.Accept()
702 if err != nil {
Shyam Jayaramandbae76b2014-11-17 12:51:29 -0800703 vlog.VI(2).Infof("Accept failed on VC %v on VIF %v: %v", vc, vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700704 return
705 }
706 if err := acceptor.Put(ConnectorAndFlow{vc, f}); err != nil {
707 vlog.VI(2).Infof("vif.acceptor.Put(%v, %T) on VIF %v failed: %v", vc, f, vif, err)
708 f.Close()
709 return
710 }
711 }
712}
713
714func (vif *VIF) distributeCounters(counters message.Counters) {
715 for cid, bytes := range counters {
716 vc, _, _ := vif.vcMap.Find(cid.VCI())
717 if vc == nil {
718 vlog.VI(2).Infof("Ignoring counters for non-existent VCI %d on VIF %s", cid.VCI(), vif)
719 continue
720 }
721 vc.ReleaseCounters(cid.Flow(), bytes)
722 }
723}
724
725func (vif *VIF) writeLoop() {
726 defer vif.outgoing.Close()
727 defer vif.stopVCWriteLoops()
728 for {
729 writer, bufs, err := vif.outgoing.Get(nil)
730 if err != nil {
731 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err)
732 return
733 }
Robin Thellend5bd72422015-02-17 12:36:38 -0800734 vif.muMsgCounters.Lock()
735 vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++
736 vif.muMsgCounters.Unlock()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700737 switch writer {
738 case vif.expressQ:
739 for _, b := range bufs {
Jason Hickey96d30e82014-11-13 07:40:00 -0800740 if err := vif.writeSerializedMessage(b.Contents); err != nil {
Jungho Ahn4b9a5192015-02-02 13:11:08 -0800741 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because Control message write failed: %s", vif, err)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700742 releaseBufs(bufs)
743 return
744 }
745 b.Release()
746 }
747 case vif.flowQ:
748 msg := &message.AddReceiveBuffers{}
749 // No need to call releaseBufs(bufs) as all bufs are
750 // the exact same value: flowToken.
751 vif.flowMu.Lock()
752 if len(vif.flowCounters) > 0 {
753 msg.Counters = vif.flowCounters
754 vif.flowCounters = message.NewCounters()
755 }
756 vif.flowMu.Unlock()
757 if len(msg.Counters) > 0 {
758 vlog.VI(3).Infof("Sending counters %v on VIF %s", msg.Counters, vif)
Jason Hickey96d30e82014-11-13 07:40:00 -0800759 if err := vif.writeMessage(msg); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700760 vlog.VI(1).Infof("Exiting writeLoop of VIF %s because AddReceiveBuffers message write failed: %v", vif, err)
761 return
762 }
763 }
764 case vif.stopQ:
765 // Lowest-priority queue which will never have any
766 // buffers, Close is the only method called on it.
767 return
768 default:
769 vif.writeDataMessages(writer, bufs)
770 }
771 }
772}
773
774func (vif *VIF) vcWriteLoop(vc *vc.VC, messages *pcqueue.T) {
775 defer vlog.VI(2).Infof("Exiting vcWriteLoop(%v) on VIF %v", vc, vif)
Asim Shankarf5781102014-06-26 22:05:35 -0700776 defer vif.wpending.Done()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700777 for {
778 qm, err := messages.Get(nil)
779 if err != nil {
780 return
781 }
782 m := qm.(*message.Data)
783 m.Payload, err = vc.Encrypt(m.Flow, m.Payload)
784 if err != nil {
785 vlog.Infof("Encryption failed. Flow:%v VC:%v Error:%v", m.Flow, vc, err)
786 }
787 if m.Close() {
788 // The last bytes written on the flow will be sent out
789 // on vif.conn. Local datastructures for the flow can
790 // be cleaned up now.
791 vif.shutdownFlow(vc, m.Flow)
792 }
793 if err == nil {
Jason Hickey96d30e82014-11-13 07:40:00 -0800794 err = vif.writeMessage(m)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700795 }
796 if err != nil {
Bogdan Capritaad5761f2014-09-23 10:56:23 -0700797 // TODO(caprita): Calling closeVCAndSendMsg below causes
798 // a race as described in:
799 // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit
800 //
801 // There should be a finer grained way to fix this, and
802 // there are likely other instances where we should not
803 // be closing the VC.
804 //
805 // For now, commenting out the line below removes the
806 // flakiness from our existing unit tests, but this
807 // needs to be revisited and fixed correctly.
808 //
809 // vif.closeVCAndSendMsg(vc, fmt.Sprintf("write failure: %v", err))
810
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700811 // Drain the queue and exit.
812 for {
813 qm, err := messages.Get(nil)
814 if err != nil {
815 return
816 }
817 qm.(*message.Data).Release()
818 }
819 }
820 }
821}
822
823func (vif *VIF) stopVCWriteLoops() {
824 vcs := vif.vcMap.Freeze()
Jungho Ahncd175b82015-03-27 14:29:40 -0700825 vif.idleTimerMap.Stop()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700826 for _, v := range vcs {
827 v.WQ.Close()
828 }
829}
830
831// sendOnExpressQ adds 'msg' to the expressQ (highest priority queue) of messages to write on the wire.
832func (vif *VIF) sendOnExpressQ(msg message.T) error {
Cosmos Nicolaou82d00d82015-02-10 21:31:00 -0800833 vlog.VI(2).Infof("sendOnExpressQ(%T = %+v) on VIF %s", msg, msg, vif)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700834 var buf bytes.Buffer
Jason Hickey96d30e82014-11-13 07:40:00 -0800835 // Don't encrypt yet, because the message ordering isn't yet determined.
836 // Encryption is performed by vif.writeSerializedMessage() when the
837 // message is actually written to vif.conn.
838 vif.writeMu.Lock()
839 c := vif.ctrlCipher
840 vif.writeMu.Unlock()
841 if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(c)); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700842 return err
843 }
844 return vif.expressQ.Put(iobuf.NewSlice(buf.Bytes()), nil)
845}
846
Jason Hickey96d30e82014-11-13 07:40:00 -0800847// writeMessage writes the message to the channel. Writes must be serialized so
848// that the control channel can be encrypted, so we acquire the writeMu.
849func (vif *VIF) writeMessage(msg message.T) error {
850 vif.writeMu.Lock()
851 defer vif.writeMu.Unlock()
852 return message.WriteTo(vif.conn, msg, vif.ctrlCipher)
853}
854
855// Write writes the message to the channel, encrypting the control data. Writes
856// must be serialized so that the control channel can be encrypted, so we
857// acquire the writeMu.
858func (vif *VIF) writeSerializedMessage(msg []byte) error {
859 vif.writeMu.Lock()
860 defer vif.writeMu.Unlock()
861 if err := message.EncryptMessage(msg, vif.ctrlCipher); err != nil {
862 return err
863 }
864 if n, err := vif.conn.Write(msg); err != nil {
Cosmos Nicolaou185c0c62015-04-13 21:22:43 -0700865 return verror.New(stream.ErrNetwork, nil, verror.New(errWriteFailed, nil, n, err, len(msg)))
Jason Hickey96d30e82014-11-13 07:40:00 -0800866 }
867 return nil
868}
869
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700870func (vif *VIF) writeDataMessages(writer bqueue.Writer, bufs []*iobuf.Slice) {
871 vci, fid := unpackIDs(writer.ID())
872 // iobuf.Coalesce will coalesce buffers only if they are adjacent to
873 // each other. In the worst case, each buf will be non-adjacent to the
874 // others and the code below will end up with multiple small writes
875 // instead of a single big one.
876 // Might want to investigate this and see if this needs to be
877 // revisited.
878 bufs = iobuf.Coalesce(bufs, uint(vc.MaxPayloadSizeBytes))
879 _, _, wq := vif.vcMap.Find(vci)
880 if wq == nil {
881 // VC has been removed, stop sending messages
882 vlog.VI(2).Infof("VCI %d on VIF %s was shutdown, dropping %d messages that were pending a write", vci, vif, len(bufs))
883 releaseBufs(bufs)
884 return
885 }
886 last := len(bufs) - 1
887 drained := writer.IsDrained()
888 for i, b := range bufs {
889 d := &message.Data{VCI: vci, Flow: fid, Payload: b}
890 if drained && i == last {
891 d.SetClose()
892 }
893 if err := wq.Put(d, nil); err != nil {
894 releaseBufs(bufs[i:])
895 return
896 }
897 }
898 if len(bufs) == 0 && drained {
899 d := &message.Data{VCI: vci, Flow: fid}
900 d.SetClose()
901 if err := wq.Put(d, nil); err != nil {
902 d.Release()
903 }
904 }
905}
906
Matt Rosencrantz0e207172015-04-16 14:58:02 -0700907func (vif *VIF) dialedVCI(VCI id.VC) bool {
908 return vif.nextVCI%2 == VCI%2
909}
910
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700911func (vif *VIF) allocVCI() id.VC {
912 vif.muNextVCI.Lock()
913 ret := vif.nextVCI
914 vif.nextVCI += 2
915 vif.muNextVCI.Unlock()
916 return ret
917}
918
Jungho Ahncd175b82015-03-27 14:29:40 -0700919func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, dialed bool) (*vc.VC, error) {
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700920 vif.muStartTimer.Lock()
921 if vif.startTimer != nil {
922 vif.startTimer.Stop()
923 vif.startTimer = nil
924 }
925 vif.muStartTimer.Unlock()
Jungho Ahncd175b82015-03-27 14:29:40 -0700926 macSize := vif.ctrlCipher.MACSize()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700927 vc := vc.InternalNew(vc.Params{
928 VCI: vci,
929 Dialed: dialed,
930 LocalEP: localEP,
931 RemoteEP: remoteEP,
932 Pool: vif.pool,
Jungho Ahncd175b82015-03-27 14:29:40 -0700933 ReserveBytes: uint(message.HeaderSizeBytes + macSize),
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700934 Helper: vcHelper{vif},
935 })
936 added, rq, wq := vif.vcMap.Insert(vc)
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700937 if added {
938 vif.idleTimerMap.Insert(vc.VCI(), idleTimeout)
939 }
Asim Shankarf5781102014-06-26 22:05:35 -0700940 // Start vcWriteLoop
941 if added = added && vif.wpending.TryAdd(); added {
942 go vif.vcWriteLoop(vc, wq)
943 }
944 // Start vcDispatchLoop
945 if added = added && vif.rpending.TryAdd(); added {
946 go vif.vcDispatchLoop(vc, rq)
947 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700948 if !added {
Asim Shankarf5781102014-06-26 22:05:35 -0700949 if rq != nil {
950 rq.Close()
951 }
952 if wq != nil {
953 wq.Close()
954 }
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700955 vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)))
Jungho Ahn6ab655f2015-04-14 18:27:09 -0700956 vif.deleteVC(vci)
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700957 return nil, verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700958 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700959 return vc, nil
960}
961
Jungho Ahncd175b82015-03-27 14:29:40 -0700962func (vif *VIF) deleteVC(vci id.VC) {
963 vif.idleTimerMap.Delete(vci)
964 if vif.vcMap.Delete(vci) {
965 vif.Close()
966 }
967}
968
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700969func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, clientVCClosed bool, errMsg error) {
970 vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, errMsg)
Jungho Ahncd175b82015-03-27 14:29:40 -0700971 vif.deleteVC(vc.VCI())
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700972 vc.Close(errMsg)
973 if clientVCClosed {
974 // No point in sending to the client if the VC is closed, or otherwise broken.
Suharsh Sivakumar9b0343a2015-02-28 20:30:21 -0800975 return
976 }
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -0700977 msg := ""
978 if errMsg != nil {
979 msg = errMsg.Error()
980 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700981 if err := vif.sendOnExpressQ(&message.CloseVC{
982 VCI: vc.VCI(),
983 Error: msg,
984 }); err != nil {
985 vlog.VI(2).Infof("sendOnExpressQ(CloseVC{VCI:%d,...}) on VIF %v failed: %v", vc.VCI(), vif, err)
986 }
987}
988
989// shutdownFlow clears out all the datastructures associated with fid.
990func (vif *VIF) shutdownFlow(vc *vc.VC, fid id.Flow) {
991 vc.ShutdownFlow(fid)
992 vif.flowMu.Lock()
993 delete(vif.flowCounters, message.MakeCounterID(vc.VCI(), fid))
994 vif.flowMu.Unlock()
Jungho Ahncd175b82015-03-27 14:29:40 -0700995 vif.idleTimerMap.DeleteFlow(vc.VCI(), fid)
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700996}
997
998// ShutdownVCs closes all VCs established to the provided remote endpoint.
999// Returns the number of VCs that were closed.
1000func (vif *VIF) ShutdownVCs(remote naming.Endpoint) int {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001001 vcs := vif.vcMap.List()
1002 n := 0
1003 for _, vc := range vcs {
Jungho Ahncd175b82015-03-27 14:29:40 -07001004 if naming.Compare(vc.RemoteEndpoint().RoutingID(), remote.RoutingID()) {
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001005 vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif)
Cosmos Nicolaou9fb10342015-04-12 19:37:24 -07001006 vif.closeVCAndSendMsg(vc, false, nil)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001007 n++
1008 }
1009 }
1010 return n
1011}
1012
1013// NumVCs returns the number of VCs established over this VIF.
1014func (vif *VIF) NumVCs() int { return vif.vcMap.Size() }
1015
1016// DebugString returns a descriptive state of the VIF.
1017//
1018// The returned string is meant for consumptions by humans. The specific format
1019// should not be relied upon by any automated processing.
1020func (vif *VIF) DebugString() string {
1021 vcs := vif.vcMap.List()
1022 l := make([]string, 0, len(vcs)+1)
1023
1024 vif.muNextVCI.Lock() // Needed for vif.nextVCI
Matt Rosencrantz1ca47182015-04-22 17:13:28 -07001025 l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.ctrlCipher != nullCipher, vif.isClosed))
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001026 vif.muNextVCI.Unlock()
1027
1028 for _, vc := range vcs {
1029 l = append(l, vc.DebugString())
1030 }
Robin Thellend5bd72422015-02-17 12:36:38 -08001031
1032 l = append(l, "Message Counters:")
1033 ctrs := len(l)
1034 vif.muMsgCounters.Lock()
1035 for k, v := range vif.msgCounters {
1036 l = append(l, fmt.Sprintf(" %-32s %10d", k, v))
1037 }
1038 vif.muMsgCounters.Unlock()
1039 sort.Strings(l[ctrs:])
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001040 return strings.Join(l, "\n")
1041}
1042
1043// Methods and type that implement vc.Helper
Jungho Ahn4b9a5192015-02-02 13:11:08 -08001044//
1045// We create a separate type for vc.Helper to hide the vc.Helper methods
1046// from the exported method set of VIF.
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001047type vcHelper struct{ vif *VIF }
1048
1049func (h vcHelper) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) {
1050 h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)})
1051}
1052
1053func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) {
1054 if bytes == 0 {
1055 return
1056 }
1057 h.vif.flowMu.Lock()
1058 h.vif.flowCounters.Add(vci, fid, uint32(bytes))
1059 h.vif.flowMu.Unlock()
1060 h.vif.flowQ.TryPut(flowToken)
1061}
1062
Jungho Ahn60408fa2015-03-27 15:28:22 -07001063func (h vcHelper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) {
Jungho Ahncd175b82015-03-27 14:29:40 -07001064 h.vif.idleTimerMap.InsertFlow(vci, fid)
Jungho Ahn60408fa2015-03-27 15:28:22 -07001065 return h.vif.outgoing.NewWriter(packIDs(vci, fid), flowPriority+priority, defaultBytesBufferedPerFlow)
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001066}
1067
1068// The token added to vif.flowQ.
1069var flowToken *iobuf.Slice
1070
1071func init() {
1072 // flowToken must be non-empty otherwise bqueue.Writer.Put will ignore it.
1073 flowToken = iobuf.NewSlice(make([]byte, 1))
1074}
1075
1076func packIDs(vci id.VC, fid id.Flow) bqueue.ID {
1077 return bqueue.ID(message.MakeCounterID(vci, fid))
1078}
1079
1080func unpackIDs(b bqueue.ID) (id.VC, id.Flow) {
1081 cid := message.CounterID(b)
1082 return cid.VCI(), cid.Flow()
1083}
1084
1085func releaseBufs(bufs []*iobuf.Slice) {
1086 for _, b := range bufs {
1087 b.Release()
1088 }
1089}
Ankur50a5f392015-02-27 18:46:30 -08001090
Asim Shankar7171a252015-03-07 14:41:40 -08001091// localEP creates a naming.Endpoint from the provided parameters.
1092//
1093// It intentionally does not include any blessings (present in endpoints in the
Matt Rosencrantzc16339c2015-04-23 10:47:06 -07001094// v5 format). At this point it is not clear whether the endpoint is being
Asim Shankar7171a252015-03-07 14:41:40 -08001095// created for a "client" or a "server". If the endpoint is used for clients
1096// (i.e., for those sending an OpenVC message for example), then we do NOT want
1097// to include the blessings in the endpoint to ensure client privacy.
1098//
1099// Servers should be happy to let anyone with access to their endpoint string
1100// know their blessings, because they are willing to share those with anyone
1101// that connects to them.
1102//
1103// The addition of the endpoints is left as an excercise to higher layers of
1104// the stack, where the desire to share or hide blessings from the endpoint is
1105// clearer.
Matt Rosencrantz0e207172015-04-16 14:58:02 -07001106func localEP(conn net.Conn, rid naming.RoutingID, versions *iversion.Range) naming.Endpoint {
Ankur50a5f392015-02-27 18:46:30 -08001107 localAddr := conn.LocalAddr()
Matt Rosencrantzc16339c2015-04-23 10:47:06 -07001108 ep := &inaming.Endpoint{
1109 Protocol: localAddr.Network(),
1110 Address: localAddr.String(),
1111 RID: rid,
Ankur50a5f392015-02-27 18:46:30 -08001112 }
1113 return ep
1114}
1115
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001116// getDialContext returns the DialContext for this call.
1117func getDialContext(vopts []stream.VCOpt) *context.T {
1118 for _, o := range vopts {
Ankur50a5f392015-02-27 18:46:30 -08001119 switch v := o.(type) {
1120 case vc.DialContext:
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001121 return v.T
Ankur50a5f392015-02-27 18:46:30 -08001122 }
1123 }
Suharsh Sivakumar2c5d8102015-03-23 08:49:12 -07001124 return nil
Ankur50a5f392015-02-27 18:46:30 -08001125}