Jiri Simsa | d7616c9 | 2015-03-24 23:44:30 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 5 | package vif |
| 6 | |
| 7 | // Logging guidelines: |
| 8 | // vlog.VI(1) for per-net.Conn information |
| 9 | // vlog.VI(2) for per-VC information |
| 10 | // vlog.VI(3) for per-Flow information |
| 11 | |
| 12 | import ( |
| 13 | "bytes" |
| 14 | "fmt" |
| 15 | "net" |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 16 | "sort" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 17 | "strings" |
| 18 | "sync" |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 19 | "time" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 20 | |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 21 | "v.io/v23/context" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 22 | "v.io/v23/naming" |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 23 | "v.io/v23/security" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 24 | "v.io/v23/verror" |
Jiri Simsa | 6ac9522 | 2015-02-23 16:11:49 -0800 | [diff] [blame] | 25 | "v.io/v23/vtrace" |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 26 | |
Jiri Simsa | 337af23 | 2015-02-27 14:36:46 -0800 | [diff] [blame] | 27 | "v.io/x/lib/vlog" |
Matt Rosencrantz | dbc1be2 | 2015-02-28 15:15:49 -0800 | [diff] [blame] | 28 | "v.io/x/ref/profiles/internal/lib/bqueue" |
| 29 | "v.io/x/ref/profiles/internal/lib/bqueue/drrqueue" |
| 30 | "v.io/x/ref/profiles/internal/lib/iobuf" |
| 31 | "v.io/x/ref/profiles/internal/lib/pcqueue" |
Matt Rosencrantz | 86ba1a1 | 2015-03-09 13:19:02 -0700 | [diff] [blame] | 32 | vsync "v.io/x/ref/profiles/internal/lib/sync" |
| 33 | "v.io/x/ref/profiles/internal/lib/upcqueue" |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 34 | inaming "v.io/x/ref/profiles/internal/naming" |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 35 | "v.io/x/ref/profiles/internal/rpc/stream" |
| 36 | "v.io/x/ref/profiles/internal/rpc/stream/crypto" |
| 37 | "v.io/x/ref/profiles/internal/rpc/stream/id" |
| 38 | "v.io/x/ref/profiles/internal/rpc/stream/message" |
| 39 | "v.io/x/ref/profiles/internal/rpc/stream/vc" |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 40 | iversion "v.io/x/ref/profiles/internal/rpc/version" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 41 | ) |
| 42 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 43 | const pkgPath = "v.io/x/ref/profiles/internal/rpc/stream/vif" |
Mike Burrows | 35baad9 | 2015-02-10 13:42:53 -0800 | [diff] [blame] | 44 | |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 45 | func reg(id, msg string) verror.IDAction { |
| 46 | return verror.Register(verror.ID(pkgPath+id), verror.NoRetry, msg) |
| 47 | } |
| 48 | |
Mike Burrows | 35baad9 | 2015-02-10 13:42:53 -0800 | [diff] [blame] | 49 | var ( |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 50 | // These errors are intended to be used as arguments to higher |
| 51 | // level errors and hence {1}{2} is omitted from their format |
| 52 | // strings to avoid repeating these n-times in the final error |
| 53 | // message visible to the user. |
| 54 | errShuttingDown = reg(".errShuttingDown", "underlying network connection({3}) shutting down") |
| 55 | errVCHandshakeFailed = reg(".errVCHandshakeFailed", "VC handshake failed{:3}") |
| 56 | errSendOnExpressQFailed = reg(".errSendOnExpressQFailed", "vif.sendOnExpressQ(OpenVC) failed{:3}") |
| 57 | errVIFIsBeingClosed = reg(".errVIFIsBeingClosed", "VIF is being closed") |
| 58 | errVIFAlreadyAcceptingFlows = reg(".errVIFAlreadyAcceptingFlows", "already accepting flows on VIF {3}") |
| 59 | errVCsNotAcceptedOnVIF = reg(".errVCsNotAcceptedOnVIF", "VCs not accepted on VIF {3}") |
| 60 | errAcceptFailed = reg(".errAcceptFailed", "Accept failed{:3}") |
| 61 | errRemoteEndClosedVC = reg(".errRemoteEndClosedVC", "remote end closed VC{:3}") |
| 62 | errFlowsNoLongerAccepted = reg(".errFlowsNowLongerAccepted", "Flows no longer being accepted") |
| 63 | errVCAcceptFailed = reg(".errVCAcceptFailed", "VC accept failed{:3}") |
| 64 | errIdleTimeout = reg(".errIdleTimeout", "idle timeout") |
| 65 | errVIFAlreadySetup = reg(".errVIFAlreadySetupt", "VIF is already setup") |
| 66 | errBqueueWriterForXpress = reg(".errBqueueWriterForXpress", "failed to create bqueue.Writer for express messages{:3}") |
| 67 | errBqueueWriterForControl = reg(".errBqueueWriterForControl", "failed to create bqueue.Writer for flow control counters{:3}") |
| 68 | errBqueueWriterForStopping = reg(".errBqueueWriterForStopping", "failed to create bqueue.Writer for stopping the write loop{:3}") |
| 69 | errWriteFailed = reg(".errWriteFailed", "write failed: got ({3}, {4}) for {5} byte message)") |
Mike Burrows | 35baad9 | 2015-02-10 13:42:53 -0800 | [diff] [blame] | 70 | ) |
| 71 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 72 | // VIF implements a "virtual interface" over an underlying network connection |
| 73 | // (net.Conn). Just like multiple network connections can be established over a |
| 74 | // single physical interface, multiple Virtual Circuits (VCs) can be |
| 75 | // established over a single VIF. |
| 76 | type VIF struct { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 77 | // All reads must be performed through reader, and not directly through conn. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 78 | conn net.Conn |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 79 | pool *iobuf.Pool |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 80 | reader *iobuf.Reader |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 81 | localEP naming.Endpoint |
| 82 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 83 | // ctrlCipher is normally guarded by writeMu, however see the exception in |
| 84 | // readLoop. |
| 85 | ctrlCipher crypto.ControlCipher |
| 86 | writeMu sync.Mutex |
| 87 | |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 88 | muStartTimer sync.Mutex |
| 89 | startTimer timer |
| 90 | |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 91 | vcMap *vcMap |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 92 | idleTimerMap *idleTimerMap |
Tilak Sharma | 6d7c39c | 2014-06-27 10:05:37 -0700 | [diff] [blame] | 93 | wpending, rpending vsync.WaitGroup |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 94 | |
| 95 | muListen sync.Mutex |
| 96 | acceptor *upcqueue.T // GUARDED_BY(muListen) |
| 97 | listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen) |
Suharsh Sivakumar | 59c423c | 2015-03-11 14:06:03 -0700 | [diff] [blame] | 98 | principal security.Principal |
Suharsh Sivakumar | e5e5dcc | 2015-03-18 14:29:31 -0700 | [diff] [blame] | 99 | blessings security.Blessings |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 100 | |
| 101 | muNextVCI sync.Mutex |
| 102 | nextVCI id.VC |
| 103 | |
| 104 | outgoing bqueue.T |
| 105 | expressQ bqueue.Writer |
| 106 | |
| 107 | flowQ bqueue.Writer |
| 108 | flowMu sync.Mutex |
| 109 | flowCounters message.Counters |
| 110 | |
| 111 | stopQ bqueue.Writer |
| 112 | |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 113 | // The RPC version range supported by this VIF. In practice this is |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 114 | // non-nil only in testing. nil is equivalent to using the versions |
Matt Rosencrantz | 94502cf | 2015-03-18 09:43:44 -0700 | [diff] [blame] | 115 | // actually supported by this RPC implementation (which is always |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 116 | // what you want outside of tests). |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 117 | versions *iversion.Range |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 118 | |
| 119 | isClosedMu sync.Mutex |
| 120 | isClosed bool // GUARDED_BY(isClosedMu) |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 121 | onClose func(*VIF) |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 122 | |
| 123 | // All sets that this VIF is in. |
| 124 | muSets sync.Mutex |
| 125 | sets []*Set // GUARDED_BY(muSets) |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 126 | |
| 127 | // These counters track the number of messages sent and received by |
| 128 | // this VIF. |
| 129 | muMsgCounters sync.Mutex |
| 130 | msgCounters map[string]int64 |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 131 | } |
| 132 | |
| 133 | // ConnectorAndFlow represents a Flow and the Connector that can be used to |
| 134 | // create another Flow over the same underlying VC. |
| 135 | type ConnectorAndFlow struct { |
| 136 | Connector stream.Connector |
| 137 | Flow stream.Flow |
| 138 | } |
| 139 | |
| 140 | // Separate out constants that are not exported so that godoc looks nicer for |
| 141 | // the exported ones. |
| 142 | const ( |
| 143 | // Priorities of the buffered queues used for flow control of writes. |
| 144 | expressPriority bqueue.Priority = iota |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 145 | controlPriority |
| 146 | // The range of flow priorities is [flowPriority, flowPriority + NumFlowPriorities) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 147 | flowPriority |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 148 | stopPriority = flowPriority + vc.NumFlowPriorities |
| 149 | ) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 150 | |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 151 | const ( |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 152 | // Convenience aliases so that the package name "vc" does not |
| 153 | // conflict with the variables named "vc". |
| 154 | defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow |
| 155 | sharedFlowID = vc.SharedFlowID |
| 156 | ) |
| 157 | |
| 158 | // InternalNewDialedVIF creates a new virtual interface over the provided |
| 159 | // network connection, under the assumption that the conn object was created |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 160 | // using net.Dial. If onClose is given, it is run in its own goroutine when |
| 161 | // the vif has been closed. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 162 | // |
| 163 | // As the name suggests, this method is intended for use only within packages |
Suharsh Sivakumar | 8646ba6 | 2015-03-18 15:22:28 -0700 | [diff] [blame] | 164 | // placed inside v.io/x/ref/profiles/internal. Code outside the |
| 165 | // v.io/x/ref/profiles/internal/* packages should never call this method. |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 166 | func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, versions *iversion.Range, onClose func(*VIF), opts ...stream.VCOpt) (*VIF, error) { |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 167 | ctx := getDialContext(opts) |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 168 | if ctx != nil { |
| 169 | var span vtrace.Span |
Todd Wang | ad49204 | 2015-04-17 15:58:40 -0700 | [diff] [blame] | 170 | ctx, span = vtrace.WithNewSpan(ctx, "InternalNewDialedVIF") |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 171 | span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr()) |
| 172 | defer span.Finish() |
| 173 | } |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 174 | pool := iobuf.NewPool(0) |
| 175 | reader := iobuf.NewReader(pool, conn) |
Matt Rosencrantz | 5c7ed21 | 2015-02-27 22:42:35 -0800 | [diff] [blame] | 176 | params := security.CallParams{LocalPrincipal: principal, LocalEndpoint: localEP(conn, rid, versions)} |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 177 | |
| 178 | // TODO(ataly, ashankar, suharshs): Figure out what authorization policy to use |
| 179 | // for authenticating the server during VIF establishment. Note that we cannot |
| 180 | // use the VC.ServerAuthorizer available in 'opts' as that applies to the end |
| 181 | // server and not the remote endpoint of the VIF. |
| 182 | c, err := AuthenticateAsClient(conn, reader, versions, params, nil) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 183 | if err != nil { |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 184 | return nil, verror.New(stream.ErrNetwork, ctx, err) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 185 | } |
Suharsh Sivakumar | e5e5dcc | 2015-03-18 14:29:31 -0700 | [diff] [blame] | 186 | var blessings security.Blessings |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 187 | |
Suharsh Sivakumar | e5e5dcc | 2015-03-18 14:29:31 -0700 | [diff] [blame] | 188 | if principal != nil { |
| 189 | blessings = principal.BlessingStore().Default() |
| 190 | } |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 191 | var startTimeout time.Duration |
| 192 | for _, o := range opts { |
| 193 | switch v := o.(type) { |
| 194 | case vc.StartTimeout: |
| 195 | startTimeout = v.Duration |
| 196 | } |
| 197 | } |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 198 | return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, principal, blessings, startTimeout, onClose, nil, nil, c) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 199 | } |
| 200 | |
| 201 | // InternalNewAcceptedVIF creates a new virtual interface over the provided |
| 202 | // network connection, under the assumption that the conn object was created |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 203 | // using an Accept call on a net.Listener object. If onClose is given, it is |
| 204 | // run in its own goroutine when the vif has been closed. |
Asim Shankar | 3d13387 | 2014-05-16 23:16:31 -0700 | [diff] [blame] | 205 | // |
| 206 | // The returned VIF is also setup for accepting new VCs and Flows with the provided |
| 207 | // ListenerOpts. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 208 | // |
| 209 | // As the name suggests, this method is intended for use only within packages |
Suharsh Sivakumar | 8646ba6 | 2015-03-18 15:22:28 -0700 | [diff] [blame] | 210 | // placed inside v.io/x/ref/profiles/internal. Code outside the |
| 211 | // v.io/x/ref/profiles/internal/* packages should never call this method. |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 212 | func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, principal security.Principal, blessings security.Blessings, versions *iversion.Range, onClose func(*VIF), lopts ...stream.ListenerOpt) (*VIF, error) { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 213 | pool := iobuf.NewPool(0) |
| 214 | reader := iobuf.NewReader(pool, conn) |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 215 | |
| 216 | dischargeClient := getDischargeClient(lopts) |
| 217 | |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 218 | c, err := AuthenticateAsServer(conn, reader, versions, principal, blessings, dischargeClient) |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 219 | if err != nil { |
| 220 | return nil, err |
| 221 | } |
| 222 | |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 223 | var startTimeout time.Duration |
| 224 | for _, o := range lopts { |
| 225 | switch v := o.(type) { |
| 226 | case vc.StartTimeout: |
| 227 | startTimeout = v.Duration |
| 228 | } |
| 229 | } |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 230 | return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, principal, blessings, startTimeout, onClose, upcqueue.New(), lopts, c) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 231 | } |
| 232 | |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 233 | func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *iversion.Range, principal security.Principal, blessings security.Blessings, startTimeout time.Duration, onClose func(*VIF), acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 234 | var ( |
| 235 | // Choose IDs that will not conflict with any other (VC, Flow) |
| 236 | // pairs. VCI 0 is never used by the application (it is |
| 237 | // reserved for control messages), so steal from the Flow space |
| 238 | // there. |
| 239 | expressID bqueue.ID = packIDs(0, 0) |
| 240 | flowID bqueue.ID = packIDs(0, 1) |
| 241 | stopID bqueue.ID = packIDs(0, 2) |
| 242 | ) |
| 243 | outgoing := drrqueue.New(vc.MaxPayloadSizeBytes) |
| 244 | |
| 245 | expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow) |
| 246 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 247 | return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForXpress, nil, err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 248 | } |
| 249 | expressQ.Release(-1) // Disable flow control |
| 250 | |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 251 | flowQ, err := outgoing.NewWriter(flowID, controlPriority, flowToken.Size()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 252 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 253 | return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForControl, nil, err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 254 | } |
| 255 | flowQ.Release(-1) // Disable flow control |
| 256 | |
| 257 | stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1) |
| 258 | if err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 259 | return nil, verror.New(stream.ErrNetwork, nil, verror.New(errBqueueWriterForStopping, nil, err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 260 | } |
| 261 | stopQ.Release(-1) // Disable flow control |
| 262 | |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 263 | if versions == nil { |
| 264 | versions = iversion.SupportedRange |
| 265 | } |
| 266 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 267 | vif := &VIF{ |
| 268 | conn: conn, |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 269 | pool: pool, |
| 270 | reader: reader, |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 271 | ctrlCipher: c, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 272 | vcMap: newVCMap(), |
Asim Shankar | 3d13387 | 2014-05-16 23:16:31 -0700 | [diff] [blame] | 273 | acceptor: acceptor, |
| 274 | listenerOpts: listenerOpts, |
Suharsh Sivakumar | 59c423c | 2015-03-11 14:06:03 -0700 | [diff] [blame] | 275 | principal: principal, |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 276 | localEP: localEP(conn, rid, versions), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 277 | nextVCI: initialVCI, |
| 278 | outgoing: outgoing, |
| 279 | expressQ: expressQ, |
| 280 | flowQ: flowQ, |
| 281 | flowCounters: message.NewCounters(), |
| 282 | stopQ: stopQ, |
| 283 | versions: versions, |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 284 | onClose: onClose, |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 285 | msgCounters: make(map[string]int64), |
Suharsh Sivakumar | e5e5dcc | 2015-03-18 14:29:31 -0700 | [diff] [blame] | 286 | blessings: blessings, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 287 | } |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 288 | if startTimeout > 0 { |
| 289 | vif.startTimer = newTimer(startTimeout, vif.Close) |
| 290 | } |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 291 | vif.idleTimerMap = newIdleTimerMap(func(vci id.VC) { |
| 292 | vc, _, _ := vif.vcMap.Find(vci) |
| 293 | if vc != nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 294 | vif.closeVCAndSendMsg(vc, false, verror.New(errIdleTimeout, nil)) |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 295 | } |
| 296 | }) |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 297 | go vif.readLoop() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 298 | go vif.writeLoop() |
| 299 | return vif, nil |
| 300 | } |
| 301 | |
| 302 | // Dial creates a new VC to the provided remote identity, authenticating the VC |
| 303 | // with the provided local identity. |
Suharsh Sivakumar | 2ad4e10 | 2015-03-17 21:23:37 -0700 | [diff] [blame] | 304 | func (vif *VIF) Dial(remoteEP naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.VC, error) { |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 305 | var idleTimeout time.Duration |
| 306 | for _, o := range opts { |
| 307 | switch v := o.(type) { |
| 308 | case vc.IdleTimeout: |
| 309 | idleTimeout = v.Duration |
| 310 | } |
| 311 | } |
| 312 | vc, err := vif.newVC(vif.allocVCI(), vif.localEP, remoteEP, idleTimeout, true) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 313 | if err != nil { |
| 314 | return nil, err |
| 315 | } |
| 316 | counters := message.NewCounters() |
| 317 | counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow) |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 318 | |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 319 | sendPublicKey := func(pubKey *crypto.BoxKey) error { |
| 320 | var options []message.SetupOption |
| 321 | if pubKey != nil { |
| 322 | options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}} |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 323 | } |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 324 | err := vif.sendOnExpressQ(&message.SetupVC{ |
| 325 | VCI: vc.VCI(), |
| 326 | RemoteEndpoint: remoteEP, |
| 327 | LocalEndpoint: vif.localEP, |
| 328 | Counters: counters, |
| 329 | Setup: message.Setup{ |
| 330 | Versions: *vif.versions, |
| 331 | Options: options, |
| 332 | }, |
| 333 | }) |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 334 | if err != nil { |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 335 | err = verror.New(stream.ErrNetwork, nil, |
| 336 | verror.New(errSendOnExpressQFailed, nil, err)) |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 337 | } |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 338 | return err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 339 | } |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 340 | if err = vc.HandshakeDialedVC(principal, sendPublicKey, opts...); err != nil { |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 341 | vif.deleteVC(vc.VCI()) |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 342 | vc.Close(err) |
| 343 | return nil, err |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 344 | } |
| 345 | return vc, nil |
| 346 | } |
| 347 | |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 348 | // addSet adds a set to the list of sets this VIF is in. This method is called |
| 349 | // by Set.Insert(). |
| 350 | func (vif *VIF) addSet(s *Set) { |
| 351 | vif.muSets.Lock() |
| 352 | defer vif.muSets.Unlock() |
| 353 | vif.sets = append(vif.sets, s) |
| 354 | } |
| 355 | |
| 356 | // removeSet removes a set from the list of sets this VIF is in. This method is |
| 357 | // called by Set.Delete(). |
| 358 | func (vif *VIF) removeSet(s *Set) { |
| 359 | vif.muSets.Lock() |
| 360 | defer vif.muSets.Unlock() |
| 361 | for ix, vs := range vif.sets { |
| 362 | if vs == s { |
| 363 | vif.sets = append(vif.sets[:ix], vif.sets[ix+1:]...) |
| 364 | return |
| 365 | } |
| 366 | } |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 367 | } |
| 368 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 369 | // Close closes all VCs (and thereby Flows) over the VIF and then closes the |
| 370 | // underlying network connection after draining all pending writes on those |
| 371 | // VCs. |
| 372 | func (vif *VIF) Close() { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 373 | vif.isClosedMu.Lock() |
| 374 | if vif.isClosed { |
| 375 | vif.isClosedMu.Unlock() |
| 376 | return |
| 377 | } |
| 378 | vif.isClosed = true |
| 379 | vif.isClosedMu.Unlock() |
| 380 | |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 381 | vif.muSets.Lock() |
| 382 | sets := vif.sets |
| 383 | vif.sets = nil |
| 384 | vif.muSets.Unlock() |
| 385 | for _, s := range sets { |
| 386 | s.Delete(vif) |
| 387 | } |
| 388 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 389 | vlog.VI(1).Infof("Closing VIF %s", vif) |
| 390 | // Stop accepting new VCs. |
| 391 | vif.StopAccepting() |
| 392 | // Close local datastructures for all existing VCs. |
| 393 | vcs := vif.vcMap.Freeze() |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 394 | // Stop the idle timers. |
| 395 | vif.idleTimerMap.Stop() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 396 | for _, vc := range vcs { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 397 | vc.VC.Close(verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil))) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 398 | } |
| 399 | // Wait for the vcWriteLoops to exit (after draining queued up messages). |
| 400 | vif.stopQ.Close() |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 401 | vif.wpending.Wait() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 402 | // Close the underlying network connection. |
| 403 | // No need to send individual messages to close all pending VCs since |
| 404 | // the remote end should know to close all VCs when the VIF's |
| 405 | // connection breaks. |
| 406 | if err := vif.conn.Close(); err != nil { |
| 407 | vlog.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err) |
| 408 | } |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 409 | // Notify that the VIF has been closed. |
| 410 | if vif.onClose != nil { |
| 411 | go vif.onClose(vif) |
| 412 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 413 | } |
| 414 | |
| 415 | // StartAccepting begins accepting Flows (and VCs) initiated by the remote end |
| 416 | // of a VIF. opts is used to setup the listener on newly established VCs. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 417 | func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error { |
| 418 | vif.muListen.Lock() |
| 419 | defer vif.muListen.Unlock() |
| 420 | if vif.acceptor != nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 421 | return verror.New(stream.ErrNetwork, nil, verror.New(errVIFIsBeingClosed, nil, vif)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 422 | } |
| 423 | vif.acceptor = upcqueue.New() |
| 424 | vif.listenerOpts = opts |
| 425 | return nil |
| 426 | } |
| 427 | |
| 428 | // StopAccepting prevents any Flows initiated by the remote end of a VIF from |
| 429 | // being accepted and causes any existing and future calls to Accept to fail |
| 430 | // immediately. |
| 431 | func (vif *VIF) StopAccepting() { |
| 432 | vif.muListen.Lock() |
| 433 | defer vif.muListen.Unlock() |
| 434 | if vif.acceptor != nil { |
| 435 | vif.acceptor.Shutdown() |
| 436 | vif.acceptor = nil |
| 437 | vif.listenerOpts = nil |
| 438 | } |
| 439 | } |
| 440 | |
| 441 | // Accept returns the (stream.Connector, stream.Flow) pair of a newly |
| 442 | // established VC and/or Flow. |
| 443 | // |
| 444 | // Sample usage: |
| 445 | // for { |
| 446 | // cAndf, err := vif.Accept() |
| 447 | // switch { |
| 448 | // case err != nil: |
| 449 | // fmt.Println("Accept error:", err) |
| 450 | // return |
| 451 | // case cAndf.Flow == nil: |
| 452 | // fmt.Println("New VC established:", cAndf.Connector) |
| 453 | // default: |
| 454 | // fmt.Println("New flow established") |
| 455 | // go handleFlow(cAndf.Flow) |
| 456 | // } |
| 457 | // } |
| 458 | func (vif *VIF) Accept() (ConnectorAndFlow, error) { |
| 459 | vif.muListen.Lock() |
| 460 | acceptor := vif.acceptor |
| 461 | vif.muListen.Unlock() |
| 462 | if acceptor == nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 463 | return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errVCsNotAcceptedOnVIF, nil, vif)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 464 | } |
| 465 | item, err := acceptor.Get(nil) |
| 466 | if err != nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 467 | return ConnectorAndFlow{}, verror.New(stream.ErrNetwork, nil, verror.New(errAcceptFailed, nil, err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 468 | } |
| 469 | return item.(ConnectorAndFlow), nil |
| 470 | } |
| 471 | |
| 472 | func (vif *VIF) String() string { |
| 473 | l := vif.conn.LocalAddr() |
| 474 | r := vif.conn.RemoteAddr() |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 475 | return fmt.Sprintf("(%s, %s) <-> (%s, %s)", l.Network(), l, r.Network(), r) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 476 | } |
| 477 | |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 478 | func (vif *VIF) readLoop() { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 479 | defer vif.Close() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 480 | defer vif.stopVCDispatchLoops() |
| 481 | for { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 482 | // vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation |
| 483 | // to it is in handleMessage, which runs in the same goroutine, so a |
| 484 | // lock is not required here. |
Matt Rosencrantz | 6902de2 | 2015-04-24 13:02:32 -0700 | [diff] [blame] | 485 | msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher) |
| 486 | if err != nil { |
| 487 | vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err) |
| 488 | return |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 489 | } |
| 490 | vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 491 | if err := vif.handleMessage(msg); err != nil { |
| 492 | vlog.VI(1).Infof("Exiting readLoop of VIF %s because of message error: %v", vif, err) |
| 493 | return |
| 494 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 495 | } |
| 496 | } |
| 497 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 498 | // handleMessage handles a single incoming message. Any error returned is |
| 499 | // fatal, causing the VIF to close. |
| 500 | func (vif *VIF) handleMessage(msg message.T) error { |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 501 | vif.muMsgCounters.Lock() |
| 502 | vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++ |
| 503 | vif.muMsgCounters.Unlock() |
| 504 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 505 | switch m := msg.(type) { |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 506 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 507 | case *message.Data: |
| 508 | _, rq, _ := vif.vcMap.Find(m.VCI) |
| 509 | if rq == nil { |
| 510 | vlog.VI(2).Infof("Ignoring message of %d bytes for unrecognized VCI %d on VIF %s", m.Payload.Size(), m.VCI, vif) |
| 511 | m.Release() |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 512 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 513 | } |
| 514 | if err := rq.Put(m, nil); err != nil { |
| 515 | vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err) |
| 516 | m.Release() |
| 517 | } |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 518 | |
Asim Shankar | cd612e1 | 2015-02-24 15:29:52 -0800 | [diff] [blame] | 519 | case *message.SetupVC: |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 520 | // First, find the public key we need out of the message. |
| 521 | var theirPK *crypto.BoxKey |
| 522 | box := m.Setup.NaclBox() |
| 523 | if box != nil { |
| 524 | theirPK = &box.PublicKey |
| 525 | } |
| 526 | |
| 527 | // If we dialed this VC, then this is a response and we should finish |
| 528 | // the vc handshake. Otherwise, this message is opening a new VC. |
| 529 | if vif.dialedVCI(m.VCI) { |
| 530 | vif.distributeCounters(m.Counters) |
| 531 | if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil { |
| 532 | intersection, err := vif.versions.Intersect(&m.Setup.Versions) |
| 533 | if err != nil { |
| 534 | vif.closeVCAndSendMsg(vc, false, err) |
| 535 | } else if err := vc.FinishHandshakeDialedVC(intersection.Max, theirPK); err != nil { |
| 536 | vif.closeVCAndSendMsg(vc, false, err) |
| 537 | } |
| 538 | return nil |
| 539 | } |
| 540 | vlog.VI(2).Infof("Ignoring SetupVC message %+v for unknown dialed VC", m) |
| 541 | return nil |
| 542 | } |
| 543 | |
| 544 | // This is an accepted VC. |
| 545 | intersection, err := vif.versions.Intersect(&m.Setup.Versions) |
| 546 | if err != nil { |
| 547 | vlog.VI(2).Infof("SetupVC message %+v to VIF %s did not present compatible versions: %v", m, vif, err) |
| 548 | vif.sendOnExpressQ(&message.CloseVC{ |
| 549 | VCI: m.VCI, |
| 550 | Error: err.Error(), |
| 551 | }) |
| 552 | return nil |
| 553 | } |
| 554 | vif.muListen.Lock() |
| 555 | closed := vif.acceptor == nil || vif.acceptor.IsClosed() |
| 556 | lopts := vif.listenerOpts |
| 557 | vif.muListen.Unlock() |
| 558 | if closed { |
| 559 | vlog.VI(2).Infof("Ignoring SetupVC message %+v as VIF %s does not accept VCs", m, vif) |
| 560 | vif.sendOnExpressQ(&message.CloseVC{ |
| 561 | VCI: m.VCI, |
| 562 | Error: "VCs not accepted", |
| 563 | }) |
| 564 | return nil |
| 565 | } |
| 566 | var idleTimeout time.Duration |
| 567 | for _, o := range lopts { |
| 568 | switch v := o.(type) { |
| 569 | case vc.IdleTimeout: |
| 570 | idleTimeout = v.Duration |
| 571 | } |
| 572 | } |
| 573 | vc, err := vif.newVC(m.VCI, m.RemoteEndpoint, m.LocalEndpoint, idleTimeout, false) |
| 574 | if err != nil { |
| 575 | vif.sendOnExpressQ(&message.CloseVC{ |
| 576 | VCI: m.VCI, |
| 577 | Error: err.Error(), |
| 578 | }) |
| 579 | return nil |
| 580 | } |
Asim Shankar | cd612e1 | 2015-02-24 15:29:52 -0800 | [diff] [blame] | 581 | vif.distributeCounters(m.Counters) |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 582 | keyExchanger := func(pubKey *crypto.BoxKey) (*crypto.BoxKey, error) { |
| 583 | var options []message.SetupOption |
| 584 | if pubKey != nil { |
| 585 | options = []message.SetupOption{&message.NaclBox{PublicKey: *pubKey}} |
| 586 | } |
| 587 | err = vif.sendOnExpressQ(&message.SetupVC{ |
| 588 | VCI: m.VCI, |
| 589 | Setup: message.Setup{ |
| 590 | // Note that servers send clients not their actual supported versions, |
| 591 | // but the intersected range of the server and client ranges. This |
| 592 | // is important because proxies may have adjusted the version ranges |
| 593 | // along the way, and we should negotiate a version that is compatible |
| 594 | // with all intermediate hops. |
| 595 | Versions: *intersection, |
| 596 | Options: options, |
| 597 | }, |
| 598 | RemoteEndpoint: m.LocalEndpoint, |
| 599 | LocalEndpoint: vif.localEP, |
| 600 | // TODO(mattr): Consider adding counters. See associated comment |
| 601 | // in vc.go:VC.HandshakeAcceptedVC for more details. |
| 602 | }) |
| 603 | return theirPK, err |
| 604 | } |
| 605 | go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(intersection.Max, vif.principal, vif.blessings, keyExchanger, lopts...)) |
| 606 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 607 | case *message.CloseVC: |
| 608 | if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil { |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 609 | vif.deleteVC(vc.VCI()) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 610 | vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif) |
Cosmos Nicolaou | 1534b3f | 2014-12-10 15:30:00 -0800 | [diff] [blame] | 611 | // TODO(cnicolaou): it would be nice to have a method on VC |
| 612 | // to indicate a 'remote close' rather than a 'local one'. This helps |
| 613 | // with error reporting since we expect reads/writes to occur |
| 614 | // after a remote close, but not after a local close. |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 615 | vc.Close(verror.New(stream.ErrNetwork, nil, verror.New(errRemoteEndClosedVC, nil, m.Error))) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 616 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 617 | } |
| 618 | vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif) |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 619 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 620 | case *message.AddReceiveBuffers: |
| 621 | vif.distributeCounters(m.Counters) |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 622 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 623 | case *message.OpenFlow: |
| 624 | if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil { |
| 625 | if err := vc.AcceptFlow(m.Flow); err != nil { |
| 626 | vlog.VI(3).Infof("OpenFlow %+v on VIF %v failed:%v", m, vif, err) |
| 627 | cm := &message.Data{VCI: m.VCI, Flow: m.Flow} |
| 628 | cm.SetClose() |
| 629 | vif.sendOnExpressQ(cm) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 630 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 631 | } |
| 632 | vc.ReleaseCounters(m.Flow, m.InitialCounters) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 633 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 634 | } |
| 635 | vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif) |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 636 | |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 637 | case *message.Setup: |
| 638 | vlog.Infof("Ignoring redundant Setup message %T on VIF %s", m, vif) |
| 639 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 640 | default: |
| 641 | vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif) |
| 642 | } |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 643 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 644 | } |
| 645 | |
| 646 | func (vif *VIF) vcDispatchLoop(vc *vc.VC, messages *pcqueue.T) { |
| 647 | defer vlog.VI(2).Infof("Exiting vcDispatchLoop(%v) on VIF %v", vc, vif) |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 648 | defer vif.rpending.Done() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 649 | for { |
| 650 | qm, err := messages.Get(nil) |
| 651 | if err != nil { |
| 652 | return |
| 653 | } |
| 654 | m := qm.(*message.Data) |
| 655 | if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil { |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 656 | vlog.VI(2).Infof("Ignoring data message %v for on VIF %s: %v", m, vif, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 657 | } |
| 658 | if m.Close() { |
| 659 | vif.shutdownFlow(vc, m.Flow) |
| 660 | } |
| 661 | } |
| 662 | } |
| 663 | |
| 664 | func (vif *VIF) stopVCDispatchLoops() { |
| 665 | vcs := vif.vcMap.Freeze() |
| 666 | for _, v := range vcs { |
| 667 | v.RQ.Close() |
| 668 | } |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 669 | vif.rpending.Wait() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 670 | } |
| 671 | |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 672 | func clientVCClosed(err error) bool { |
| 673 | // If we've encountered a networking error, then all likelihood the |
| 674 | // connection to the client is closed. |
| 675 | return verror.ErrorID(err) == stream.ErrNetwork.ID |
| 676 | } |
| 677 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 678 | func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) { |
| 679 | hr := <-c |
| 680 | if hr.Error != nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 681 | vif.closeVCAndSendMsg(vc, clientVCClosed(hr.Error), hr.Error) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 682 | return |
| 683 | } |
| 684 | |
| 685 | vif.muListen.Lock() |
| 686 | acceptor := vif.acceptor |
| 687 | vif.muListen.Unlock() |
| 688 | if acceptor == nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 689 | vif.closeVCAndSendMsg(vc, false, verror.New(errFlowsNoLongerAccepted, nil)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 690 | return |
| 691 | } |
| 692 | |
| 693 | // Notify any listeners that a new VC has been established |
| 694 | if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil { |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 695 | vif.closeVCAndSendMsg(vc, clientVCClosed(err), verror.New(errVCAcceptFailed, nil, err)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 696 | return |
| 697 | } |
| 698 | |
| 699 | vlog.VI(2).Infof("Running acceptFlowsLoop for VC %v on VIF %v", vc, vif) |
| 700 | for { |
| 701 | f, err := hr.Listener.Accept() |
| 702 | if err != nil { |
Shyam Jayaraman | dbae76b | 2014-11-17 12:51:29 -0800 | [diff] [blame] | 703 | vlog.VI(2).Infof("Accept failed on VC %v on VIF %v: %v", vc, vif, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 704 | return |
| 705 | } |
| 706 | if err := acceptor.Put(ConnectorAndFlow{vc, f}); err != nil { |
| 707 | vlog.VI(2).Infof("vif.acceptor.Put(%v, %T) on VIF %v failed: %v", vc, f, vif, err) |
| 708 | f.Close() |
| 709 | return |
| 710 | } |
| 711 | } |
| 712 | } |
| 713 | |
| 714 | func (vif *VIF) distributeCounters(counters message.Counters) { |
| 715 | for cid, bytes := range counters { |
| 716 | vc, _, _ := vif.vcMap.Find(cid.VCI()) |
| 717 | if vc == nil { |
| 718 | vlog.VI(2).Infof("Ignoring counters for non-existent VCI %d on VIF %s", cid.VCI(), vif) |
| 719 | continue |
| 720 | } |
| 721 | vc.ReleaseCounters(cid.Flow(), bytes) |
| 722 | } |
| 723 | } |
| 724 | |
| 725 | func (vif *VIF) writeLoop() { |
| 726 | defer vif.outgoing.Close() |
| 727 | defer vif.stopVCWriteLoops() |
| 728 | for { |
| 729 | writer, bufs, err := vif.outgoing.Get(nil) |
| 730 | if err != nil { |
| 731 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err) |
| 732 | return |
| 733 | } |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 734 | vif.muMsgCounters.Lock() |
| 735 | vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++ |
| 736 | vif.muMsgCounters.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 737 | switch writer { |
| 738 | case vif.expressQ: |
| 739 | for _, b := range bufs { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 740 | if err := vif.writeSerializedMessage(b.Contents); err != nil { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 741 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because Control message write failed: %s", vif, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 742 | releaseBufs(bufs) |
| 743 | return |
| 744 | } |
| 745 | b.Release() |
| 746 | } |
| 747 | case vif.flowQ: |
| 748 | msg := &message.AddReceiveBuffers{} |
| 749 | // No need to call releaseBufs(bufs) as all bufs are |
| 750 | // the exact same value: flowToken. |
| 751 | vif.flowMu.Lock() |
| 752 | if len(vif.flowCounters) > 0 { |
| 753 | msg.Counters = vif.flowCounters |
| 754 | vif.flowCounters = message.NewCounters() |
| 755 | } |
| 756 | vif.flowMu.Unlock() |
| 757 | if len(msg.Counters) > 0 { |
| 758 | vlog.VI(3).Infof("Sending counters %v on VIF %s", msg.Counters, vif) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 759 | if err := vif.writeMessage(msg); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 760 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because AddReceiveBuffers message write failed: %v", vif, err) |
| 761 | return |
| 762 | } |
| 763 | } |
| 764 | case vif.stopQ: |
| 765 | // Lowest-priority queue which will never have any |
| 766 | // buffers, Close is the only method called on it. |
| 767 | return |
| 768 | default: |
| 769 | vif.writeDataMessages(writer, bufs) |
| 770 | } |
| 771 | } |
| 772 | } |
| 773 | |
| 774 | func (vif *VIF) vcWriteLoop(vc *vc.VC, messages *pcqueue.T) { |
| 775 | defer vlog.VI(2).Infof("Exiting vcWriteLoop(%v) on VIF %v", vc, vif) |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 776 | defer vif.wpending.Done() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 777 | for { |
| 778 | qm, err := messages.Get(nil) |
| 779 | if err != nil { |
| 780 | return |
| 781 | } |
| 782 | m := qm.(*message.Data) |
| 783 | m.Payload, err = vc.Encrypt(m.Flow, m.Payload) |
| 784 | if err != nil { |
| 785 | vlog.Infof("Encryption failed. Flow:%v VC:%v Error:%v", m.Flow, vc, err) |
| 786 | } |
| 787 | if m.Close() { |
| 788 | // The last bytes written on the flow will be sent out |
| 789 | // on vif.conn. Local datastructures for the flow can |
| 790 | // be cleaned up now. |
| 791 | vif.shutdownFlow(vc, m.Flow) |
| 792 | } |
| 793 | if err == nil { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 794 | err = vif.writeMessage(m) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 795 | } |
| 796 | if err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 797 | // TODO(caprita): Calling closeVCAndSendMsg below causes |
| 798 | // a race as described in: |
| 799 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 800 | // |
| 801 | // There should be a finer grained way to fix this, and |
| 802 | // there are likely other instances where we should not |
| 803 | // be closing the VC. |
| 804 | // |
| 805 | // For now, commenting out the line below removes the |
| 806 | // flakiness from our existing unit tests, but this |
| 807 | // needs to be revisited and fixed correctly. |
| 808 | // |
| 809 | // vif.closeVCAndSendMsg(vc, fmt.Sprintf("write failure: %v", err)) |
| 810 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 811 | // Drain the queue and exit. |
| 812 | for { |
| 813 | qm, err := messages.Get(nil) |
| 814 | if err != nil { |
| 815 | return |
| 816 | } |
| 817 | qm.(*message.Data).Release() |
| 818 | } |
| 819 | } |
| 820 | } |
| 821 | } |
| 822 | |
| 823 | func (vif *VIF) stopVCWriteLoops() { |
| 824 | vcs := vif.vcMap.Freeze() |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 825 | vif.idleTimerMap.Stop() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 826 | for _, v := range vcs { |
| 827 | v.WQ.Close() |
| 828 | } |
| 829 | } |
| 830 | |
| 831 | // sendOnExpressQ adds 'msg' to the expressQ (highest priority queue) of messages to write on the wire. |
| 832 | func (vif *VIF) sendOnExpressQ(msg message.T) error { |
Cosmos Nicolaou | 82d00d8 | 2015-02-10 21:31:00 -0800 | [diff] [blame] | 833 | vlog.VI(2).Infof("sendOnExpressQ(%T = %+v) on VIF %s", msg, msg, vif) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 834 | var buf bytes.Buffer |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 835 | // Don't encrypt yet, because the message ordering isn't yet determined. |
| 836 | // Encryption is performed by vif.writeSerializedMessage() when the |
| 837 | // message is actually written to vif.conn. |
| 838 | vif.writeMu.Lock() |
| 839 | c := vif.ctrlCipher |
| 840 | vif.writeMu.Unlock() |
| 841 | if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(c)); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 842 | return err |
| 843 | } |
| 844 | return vif.expressQ.Put(iobuf.NewSlice(buf.Bytes()), nil) |
| 845 | } |
| 846 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 847 | // writeMessage writes the message to the channel. Writes must be serialized so |
| 848 | // that the control channel can be encrypted, so we acquire the writeMu. |
| 849 | func (vif *VIF) writeMessage(msg message.T) error { |
| 850 | vif.writeMu.Lock() |
| 851 | defer vif.writeMu.Unlock() |
| 852 | return message.WriteTo(vif.conn, msg, vif.ctrlCipher) |
| 853 | } |
| 854 | |
| 855 | // Write writes the message to the channel, encrypting the control data. Writes |
| 856 | // must be serialized so that the control channel can be encrypted, so we |
| 857 | // acquire the writeMu. |
| 858 | func (vif *VIF) writeSerializedMessage(msg []byte) error { |
| 859 | vif.writeMu.Lock() |
| 860 | defer vif.writeMu.Unlock() |
| 861 | if err := message.EncryptMessage(msg, vif.ctrlCipher); err != nil { |
| 862 | return err |
| 863 | } |
| 864 | if n, err := vif.conn.Write(msg); err != nil { |
Cosmos Nicolaou | 185c0c6 | 2015-04-13 21:22:43 -0700 | [diff] [blame] | 865 | return verror.New(stream.ErrNetwork, nil, verror.New(errWriteFailed, nil, n, err, len(msg))) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 866 | } |
| 867 | return nil |
| 868 | } |
| 869 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 870 | func (vif *VIF) writeDataMessages(writer bqueue.Writer, bufs []*iobuf.Slice) { |
| 871 | vci, fid := unpackIDs(writer.ID()) |
| 872 | // iobuf.Coalesce will coalesce buffers only if they are adjacent to |
| 873 | // each other. In the worst case, each buf will be non-adjacent to the |
| 874 | // others and the code below will end up with multiple small writes |
| 875 | // instead of a single big one. |
| 876 | // Might want to investigate this and see if this needs to be |
| 877 | // revisited. |
| 878 | bufs = iobuf.Coalesce(bufs, uint(vc.MaxPayloadSizeBytes)) |
| 879 | _, _, wq := vif.vcMap.Find(vci) |
| 880 | if wq == nil { |
| 881 | // VC has been removed, stop sending messages |
| 882 | vlog.VI(2).Infof("VCI %d on VIF %s was shutdown, dropping %d messages that were pending a write", vci, vif, len(bufs)) |
| 883 | releaseBufs(bufs) |
| 884 | return |
| 885 | } |
| 886 | last := len(bufs) - 1 |
| 887 | drained := writer.IsDrained() |
| 888 | for i, b := range bufs { |
| 889 | d := &message.Data{VCI: vci, Flow: fid, Payload: b} |
| 890 | if drained && i == last { |
| 891 | d.SetClose() |
| 892 | } |
| 893 | if err := wq.Put(d, nil); err != nil { |
| 894 | releaseBufs(bufs[i:]) |
| 895 | return |
| 896 | } |
| 897 | } |
| 898 | if len(bufs) == 0 && drained { |
| 899 | d := &message.Data{VCI: vci, Flow: fid} |
| 900 | d.SetClose() |
| 901 | if err := wq.Put(d, nil); err != nil { |
| 902 | d.Release() |
| 903 | } |
| 904 | } |
| 905 | } |
| 906 | |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 907 | func (vif *VIF) dialedVCI(VCI id.VC) bool { |
| 908 | return vif.nextVCI%2 == VCI%2 |
| 909 | } |
| 910 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 911 | func (vif *VIF) allocVCI() id.VC { |
| 912 | vif.muNextVCI.Lock() |
| 913 | ret := vif.nextVCI |
| 914 | vif.nextVCI += 2 |
| 915 | vif.muNextVCI.Unlock() |
| 916 | return ret |
| 917 | } |
| 918 | |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 919 | func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, idleTimeout time.Duration, dialed bool) (*vc.VC, error) { |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 920 | vif.muStartTimer.Lock() |
| 921 | if vif.startTimer != nil { |
| 922 | vif.startTimer.Stop() |
| 923 | vif.startTimer = nil |
| 924 | } |
| 925 | vif.muStartTimer.Unlock() |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 926 | macSize := vif.ctrlCipher.MACSize() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 927 | vc := vc.InternalNew(vc.Params{ |
| 928 | VCI: vci, |
| 929 | Dialed: dialed, |
| 930 | LocalEP: localEP, |
| 931 | RemoteEP: remoteEP, |
| 932 | Pool: vif.pool, |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 933 | ReserveBytes: uint(message.HeaderSizeBytes + macSize), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 934 | Helper: vcHelper{vif}, |
| 935 | }) |
| 936 | added, rq, wq := vif.vcMap.Insert(vc) |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 937 | if added { |
| 938 | vif.idleTimerMap.Insert(vc.VCI(), idleTimeout) |
| 939 | } |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 940 | // Start vcWriteLoop |
| 941 | if added = added && vif.wpending.TryAdd(); added { |
| 942 | go vif.vcWriteLoop(vc, wq) |
| 943 | } |
| 944 | // Start vcDispatchLoop |
| 945 | if added = added && vif.rpending.TryAdd(); added { |
| 946 | go vif.vcDispatchLoop(vc, rq) |
| 947 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 948 | if !added { |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 949 | if rq != nil { |
| 950 | rq.Close() |
| 951 | } |
| 952 | if wq != nil { |
| 953 | wq.Close() |
| 954 | } |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 955 | vc.Close(verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif))) |
Jungho Ahn | 6ab655f | 2015-04-14 18:27:09 -0700 | [diff] [blame] | 956 | vif.deleteVC(vci) |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 957 | return nil, verror.New(stream.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 958 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 959 | return vc, nil |
| 960 | } |
| 961 | |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 962 | func (vif *VIF) deleteVC(vci id.VC) { |
| 963 | vif.idleTimerMap.Delete(vci) |
| 964 | if vif.vcMap.Delete(vci) { |
| 965 | vif.Close() |
| 966 | } |
| 967 | } |
| 968 | |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 969 | func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, clientVCClosed bool, errMsg error) { |
| 970 | vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, errMsg) |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 971 | vif.deleteVC(vc.VCI()) |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 972 | vc.Close(errMsg) |
| 973 | if clientVCClosed { |
| 974 | // No point in sending to the client if the VC is closed, or otherwise broken. |
Suharsh Sivakumar | 9b0343a | 2015-02-28 20:30:21 -0800 | [diff] [blame] | 975 | return |
| 976 | } |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 977 | msg := "" |
| 978 | if errMsg != nil { |
| 979 | msg = errMsg.Error() |
| 980 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 981 | if err := vif.sendOnExpressQ(&message.CloseVC{ |
| 982 | VCI: vc.VCI(), |
| 983 | Error: msg, |
| 984 | }); err != nil { |
| 985 | vlog.VI(2).Infof("sendOnExpressQ(CloseVC{VCI:%d,...}) on VIF %v failed: %v", vc.VCI(), vif, err) |
| 986 | } |
| 987 | } |
| 988 | |
| 989 | // shutdownFlow clears out all the datastructures associated with fid. |
| 990 | func (vif *VIF) shutdownFlow(vc *vc.VC, fid id.Flow) { |
| 991 | vc.ShutdownFlow(fid) |
| 992 | vif.flowMu.Lock() |
| 993 | delete(vif.flowCounters, message.MakeCounterID(vc.VCI(), fid)) |
| 994 | vif.flowMu.Unlock() |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 995 | vif.idleTimerMap.DeleteFlow(vc.VCI(), fid) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 996 | } |
| 997 | |
| 998 | // ShutdownVCs closes all VCs established to the provided remote endpoint. |
| 999 | // Returns the number of VCs that were closed. |
| 1000 | func (vif *VIF) ShutdownVCs(remote naming.Endpoint) int { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1001 | vcs := vif.vcMap.List() |
| 1002 | n := 0 |
| 1003 | for _, vc := range vcs { |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 1004 | if naming.Compare(vc.RemoteEndpoint().RoutingID(), remote.RoutingID()) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1005 | vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif) |
Cosmos Nicolaou | 9fb1034 | 2015-04-12 19:37:24 -0700 | [diff] [blame] | 1006 | vif.closeVCAndSendMsg(vc, false, nil) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1007 | n++ |
| 1008 | } |
| 1009 | } |
| 1010 | return n |
| 1011 | } |
| 1012 | |
| 1013 | // NumVCs returns the number of VCs established over this VIF. |
| 1014 | func (vif *VIF) NumVCs() int { return vif.vcMap.Size() } |
| 1015 | |
| 1016 | // DebugString returns a descriptive state of the VIF. |
| 1017 | // |
| 1018 | // The returned string is meant for consumptions by humans. The specific format |
| 1019 | // should not be relied upon by any automated processing. |
| 1020 | func (vif *VIF) DebugString() string { |
| 1021 | vcs := vif.vcMap.List() |
| 1022 | l := make([]string, 0, len(vcs)+1) |
| 1023 | |
| 1024 | vif.muNextVCI.Lock() // Needed for vif.nextVCI |
Matt Rosencrantz | 1ca4718 | 2015-04-22 17:13:28 -0700 | [diff] [blame] | 1025 | l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.ctrlCipher != nullCipher, vif.isClosed)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1026 | vif.muNextVCI.Unlock() |
| 1027 | |
| 1028 | for _, vc := range vcs { |
| 1029 | l = append(l, vc.DebugString()) |
| 1030 | } |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame] | 1031 | |
| 1032 | l = append(l, "Message Counters:") |
| 1033 | ctrs := len(l) |
| 1034 | vif.muMsgCounters.Lock() |
| 1035 | for k, v := range vif.msgCounters { |
| 1036 | l = append(l, fmt.Sprintf(" %-32s %10d", k, v)) |
| 1037 | } |
| 1038 | vif.muMsgCounters.Unlock() |
| 1039 | sort.Strings(l[ctrs:]) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1040 | return strings.Join(l, "\n") |
| 1041 | } |
| 1042 | |
| 1043 | // Methods and type that implement vc.Helper |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 1044 | // |
| 1045 | // We create a separate type for vc.Helper to hide the vc.Helper methods |
| 1046 | // from the exported method set of VIF. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1047 | type vcHelper struct{ vif *VIF } |
| 1048 | |
| 1049 | func (h vcHelper) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) { |
| 1050 | h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)}) |
| 1051 | } |
| 1052 | |
| 1053 | func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) { |
| 1054 | if bytes == 0 { |
| 1055 | return |
| 1056 | } |
| 1057 | h.vif.flowMu.Lock() |
| 1058 | h.vif.flowCounters.Add(vci, fid, uint32(bytes)) |
| 1059 | h.vif.flowMu.Unlock() |
| 1060 | h.vif.flowQ.TryPut(flowToken) |
| 1061 | } |
| 1062 | |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 1063 | func (h vcHelper) NewWriter(vci id.VC, fid id.Flow, priority bqueue.Priority) (bqueue.Writer, error) { |
Jungho Ahn | cd175b8 | 2015-03-27 14:29:40 -0700 | [diff] [blame] | 1064 | h.vif.idleTimerMap.InsertFlow(vci, fid) |
Jungho Ahn | 60408fa | 2015-03-27 15:28:22 -0700 | [diff] [blame] | 1065 | return h.vif.outgoing.NewWriter(packIDs(vci, fid), flowPriority+priority, defaultBytesBufferedPerFlow) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1066 | } |
| 1067 | |
| 1068 | // The token added to vif.flowQ. |
| 1069 | var flowToken *iobuf.Slice |
| 1070 | |
| 1071 | func init() { |
| 1072 | // flowToken must be non-empty otherwise bqueue.Writer.Put will ignore it. |
| 1073 | flowToken = iobuf.NewSlice(make([]byte, 1)) |
| 1074 | } |
| 1075 | |
| 1076 | func packIDs(vci id.VC, fid id.Flow) bqueue.ID { |
| 1077 | return bqueue.ID(message.MakeCounterID(vci, fid)) |
| 1078 | } |
| 1079 | |
| 1080 | func unpackIDs(b bqueue.ID) (id.VC, id.Flow) { |
| 1081 | cid := message.CounterID(b) |
| 1082 | return cid.VCI(), cid.Flow() |
| 1083 | } |
| 1084 | |
| 1085 | func releaseBufs(bufs []*iobuf.Slice) { |
| 1086 | for _, b := range bufs { |
| 1087 | b.Release() |
| 1088 | } |
| 1089 | } |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1090 | |
Asim Shankar | 7171a25 | 2015-03-07 14:41:40 -0800 | [diff] [blame] | 1091 | // localEP creates a naming.Endpoint from the provided parameters. |
| 1092 | // |
| 1093 | // It intentionally does not include any blessings (present in endpoints in the |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 1094 | // v5 format). At this point it is not clear whether the endpoint is being |
Asim Shankar | 7171a25 | 2015-03-07 14:41:40 -0800 | [diff] [blame] | 1095 | // created for a "client" or a "server". If the endpoint is used for clients |
| 1096 | // (i.e., for those sending an OpenVC message for example), then we do NOT want |
| 1097 | // to include the blessings in the endpoint to ensure client privacy. |
| 1098 | // |
| 1099 | // Servers should be happy to let anyone with access to their endpoint string |
| 1100 | // know their blessings, because they are willing to share those with anyone |
| 1101 | // that connects to them. |
| 1102 | // |
| 1103 | // The addition of the endpoints is left as an excercise to higher layers of |
| 1104 | // the stack, where the desire to share or hide blessings from the endpoint is |
| 1105 | // clearer. |
Matt Rosencrantz | 0e20717 | 2015-04-16 14:58:02 -0700 | [diff] [blame] | 1106 | func localEP(conn net.Conn, rid naming.RoutingID, versions *iversion.Range) naming.Endpoint { |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1107 | localAddr := conn.LocalAddr() |
Matt Rosencrantz | c16339c | 2015-04-23 10:47:06 -0700 | [diff] [blame] | 1108 | ep := &inaming.Endpoint{ |
| 1109 | Protocol: localAddr.Network(), |
| 1110 | Address: localAddr.String(), |
| 1111 | RID: rid, |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1112 | } |
| 1113 | return ep |
| 1114 | } |
| 1115 | |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 1116 | // getDialContext returns the DialContext for this call. |
| 1117 | func getDialContext(vopts []stream.VCOpt) *context.T { |
| 1118 | for _, o := range vopts { |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1119 | switch v := o.(type) { |
| 1120 | case vc.DialContext: |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 1121 | return v.T |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1122 | } |
| 1123 | } |
Suharsh Sivakumar | 2c5d810 | 2015-03-23 08:49:12 -0700 | [diff] [blame] | 1124 | return nil |
Ankur | 50a5f39 | 2015-02-27 18:46:30 -0800 | [diff] [blame] | 1125 | } |