Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 1 | package vif |
| 2 | |
| 3 | // Logging guidelines: |
| 4 | // vlog.VI(1) for per-net.Conn information |
| 5 | // vlog.VI(2) for per-VC information |
| 6 | // vlog.VI(3) for per-Flow information |
| 7 | |
| 8 | import ( |
| 9 | "bytes" |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 10 | "errors" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 11 | "fmt" |
| 12 | "net" |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 13 | "sort" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 14 | "strings" |
| 15 | "sync" |
| 16 | "time" |
| 17 | |
Suharsh Sivakumar | af862a5 | 2015-02-04 13:50:47 -0800 | [diff] [blame] | 18 | "v.io/core/veyron/runtimes/google/ipc/stream" |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 19 | "v.io/core/veyron/runtimes/google/ipc/stream/crypto" |
| 20 | "v.io/core/veyron/runtimes/google/ipc/stream/id" |
| 21 | "v.io/core/veyron/runtimes/google/ipc/stream/message" |
| 22 | "v.io/core/veyron/runtimes/google/ipc/stream/vc" |
| 23 | "v.io/core/veyron/runtimes/google/ipc/version" |
| 24 | "v.io/core/veyron/runtimes/google/lib/bqueue" |
| 25 | "v.io/core/veyron/runtimes/google/lib/bqueue/drrqueue" |
| 26 | "v.io/core/veyron/runtimes/google/lib/iobuf" |
| 27 | "v.io/core/veyron/runtimes/google/lib/pcqueue" |
| 28 | vsync "v.io/core/veyron/runtimes/google/lib/sync" |
| 29 | "v.io/core/veyron/runtimes/google/lib/upcqueue" |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 30 | "v.io/core/veyron2/naming" |
Todd Wang | ff73e1f | 2015-02-10 21:45:52 -0800 | [diff] [blame] | 31 | "v.io/core/veyron2/verror" |
Jiri Simsa | 764efb7 | 2014-12-25 20:57:03 -0800 | [diff] [blame] | 32 | "v.io/core/veyron2/vlog" |
| 33 | "v.io/core/veyron2/vtrace" |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 34 | ) |
| 35 | |
Mike Burrows | 35baad9 | 2015-02-10 13:42:53 -0800 | [diff] [blame] | 36 | const pkgPath = "v.io/core/veyron/runtimes/google/ipc/stream/vif" |
| 37 | |
| 38 | var ( |
| 39 | errShuttingDown = verror.Register(pkgPath+".errShuttingDown", verror.NoRetry, "{1:}{2:} underlying network connection({3}) shutting down{:_}") |
| 40 | ) |
| 41 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 42 | // VIF implements a "virtual interface" over an underlying network connection |
| 43 | // (net.Conn). Just like multiple network connections can be established over a |
| 44 | // single physical interface, multiple Virtual Circuits (VCs) can be |
| 45 | // established over a single VIF. |
| 46 | type VIF struct { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 47 | // All reads must be performed through reader, and not directly through conn. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 48 | conn net.Conn |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 49 | pool *iobuf.Pool |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 50 | reader *iobuf.Reader |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 51 | localEP naming.Endpoint |
| 52 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 53 | // control channel encryption. |
| 54 | isSetup bool |
| 55 | // ctrlCipher is normally guarded by writeMu, however see the exception in |
| 56 | // readLoop. |
| 57 | ctrlCipher crypto.ControlCipher |
| 58 | writeMu sync.Mutex |
| 59 | |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 60 | vcMap *vcMap |
Tilak Sharma | 6d7c39c | 2014-06-27 10:05:37 -0700 | [diff] [blame] | 61 | wpending, rpending vsync.WaitGroup |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 62 | |
| 63 | muListen sync.Mutex |
| 64 | acceptor *upcqueue.T // GUARDED_BY(muListen) |
| 65 | listenerOpts []stream.ListenerOpt // GUARDED_BY(muListen) |
| 66 | |
| 67 | muNextVCI sync.Mutex |
| 68 | nextVCI id.VC |
| 69 | |
| 70 | outgoing bqueue.T |
| 71 | expressQ bqueue.Writer |
| 72 | |
| 73 | flowQ bqueue.Writer |
| 74 | flowMu sync.Mutex |
| 75 | flowCounters message.Counters |
| 76 | |
| 77 | stopQ bqueue.Writer |
| 78 | |
| 79 | // The IPC version range supported by this VIF. In practice this is |
| 80 | // non-nil only in testing. nil is equivalent to using the versions |
| 81 | // actually supported by this IPC implementation (which is always |
| 82 | // what you want outside of tests). |
| 83 | versions *version.Range |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 84 | |
| 85 | isClosedMu sync.Mutex |
| 86 | isClosed bool // GUARDED_BY(isClosedMu) |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 87 | |
| 88 | // All sets that this VIF is in. |
| 89 | muSets sync.Mutex |
| 90 | sets []*Set // GUARDED_BY(muSets) |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 91 | |
| 92 | // These counters track the number of messages sent and received by |
| 93 | // this VIF. |
| 94 | muMsgCounters sync.Mutex |
| 95 | msgCounters map[string]int64 |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 96 | } |
| 97 | |
| 98 | // ConnectorAndFlow represents a Flow and the Connector that can be used to |
| 99 | // create another Flow over the same underlying VC. |
| 100 | type ConnectorAndFlow struct { |
| 101 | Connector stream.Connector |
| 102 | Flow stream.Flow |
| 103 | } |
| 104 | |
| 105 | // Separate out constants that are not exported so that godoc looks nicer for |
| 106 | // the exported ones. |
| 107 | const ( |
| 108 | // Priorities of the buffered queues used for flow control of writes. |
| 109 | expressPriority bqueue.Priority = iota |
| 110 | flowPriority |
| 111 | normalPriority |
| 112 | stopPriority |
| 113 | |
| 114 | // Convenience aliases so that the package name "vc" does not |
| 115 | // conflict with the variables named "vc". |
| 116 | defaultBytesBufferedPerFlow = vc.DefaultBytesBufferedPerFlow |
| 117 | sharedFlowID = vc.SharedFlowID |
| 118 | ) |
| 119 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 120 | var ( |
| 121 | errAlreadySetup = errors.New("VIF is already setup") |
| 122 | ) |
| 123 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 124 | // InternalNewDialedVIF creates a new virtual interface over the provided |
| 125 | // network connection, under the assumption that the conn object was created |
| 126 | // using net.Dial. |
| 127 | // |
| 128 | // As the name suggests, this method is intended for use only within packages |
| 129 | // placed inside veyron/runtimes/google. Code outside the |
| 130 | // veyron2/runtimes/google/* packages should never call this method. |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 131 | func InternalNewDialedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, opts ...stream.VCOpt) (*VIF, error) { |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 132 | ctx, principal, dc, err := clientAuthOptions(opts) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 133 | if err != nil { |
| 134 | return nil, err |
| 135 | } |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 136 | if ctx != nil { |
| 137 | var span vtrace.Span |
Matt Rosencrantz | 5f98d94 | 2015-01-08 13:48:30 -0800 | [diff] [blame] | 138 | ctx, span = vtrace.SetNewSpan(ctx, "InternalNewDialedVIF") |
Asim Shankar | f4864f4 | 2014-11-25 18:53:05 -0800 | [diff] [blame] | 139 | span.Annotatef("(%v, %v)", conn.RemoteAddr().Network(), conn.RemoteAddr()) |
| 140 | defer span.Finish() |
| 141 | } |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 142 | pool := iobuf.NewPool(0) |
| 143 | reader := iobuf.NewReader(pool, conn) |
| 144 | c, err := AuthenticateAsClient(ctx, conn, reader, versions, principal, dc) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 145 | if err != nil { |
| 146 | return nil, err |
| 147 | } |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 148 | return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs), versions, nil, nil, c) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 149 | } |
| 150 | |
| 151 | // InternalNewAcceptedVIF creates a new virtual interface over the provided |
| 152 | // network connection, under the assumption that the conn object was created |
Asim Shankar | 3d13387 | 2014-05-16 23:16:31 -0700 | [diff] [blame] | 153 | // using an Accept call on a net.Listener object. |
| 154 | // |
| 155 | // The returned VIF is also setup for accepting new VCs and Flows with the provided |
| 156 | // ListenerOpts. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 157 | // |
| 158 | // As the name suggests, this method is intended for use only within packages |
| 159 | // placed inside veyron/runtimes/google. Code outside the |
| 160 | // veyron/runtimes/google/* packages should never call this method. |
Asim Shankar | 3d13387 | 2014-05-16 23:16:31 -0700 | [diff] [blame] | 161 | func InternalNewAcceptedVIF(conn net.Conn, rid naming.RoutingID, versions *version.Range, lopts ...stream.ListenerOpt) (*VIF, error) { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 162 | pool := iobuf.NewPool(0) |
| 163 | reader := iobuf.NewReader(pool, conn) |
| 164 | return internalNew(conn, pool, reader, rid, id.VC(vc.NumReservedVCs)+1, versions, upcqueue.New(), lopts, &crypto.NullControlCipher{}) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 165 | } |
| 166 | |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 167 | func internalNew(conn net.Conn, pool *iobuf.Pool, reader *iobuf.Reader, rid naming.RoutingID, initialVCI id.VC, versions *version.Range, acceptor *upcqueue.T, listenerOpts []stream.ListenerOpt, c crypto.ControlCipher) (*VIF, error) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 168 | // Some cloud providers (like Google Compute Engine) seem to blackhole |
| 169 | // inactive TCP connections, set a TCP keep alive to prevent that. |
| 170 | // See: https://developers.google.com/compute/docs/troubleshooting#communicatewithinternet |
| 171 | if tcpconn, ok := conn.(*net.TCPConn); ok { |
| 172 | if err := tcpconn.SetKeepAlivePeriod(30 * time.Second); err != nil { |
| 173 | vlog.Errorf("Failed to set TCP keep alive: %v", err) |
| 174 | } else { |
| 175 | tcpconn.SetKeepAlive(true) |
| 176 | } |
| 177 | } |
| 178 | var ( |
| 179 | // Choose IDs that will not conflict with any other (VC, Flow) |
| 180 | // pairs. VCI 0 is never used by the application (it is |
| 181 | // reserved for control messages), so steal from the Flow space |
| 182 | // there. |
| 183 | expressID bqueue.ID = packIDs(0, 0) |
| 184 | flowID bqueue.ID = packIDs(0, 1) |
| 185 | stopID bqueue.ID = packIDs(0, 2) |
| 186 | ) |
| 187 | outgoing := drrqueue.New(vc.MaxPayloadSizeBytes) |
| 188 | |
| 189 | expressQ, err := outgoing.NewWriter(expressID, expressPriority, defaultBytesBufferedPerFlow) |
| 190 | if err != nil { |
| 191 | return nil, fmt.Errorf("failed to create bqueue.Writer for express messages: %v", err) |
| 192 | } |
| 193 | expressQ.Release(-1) // Disable flow control |
| 194 | |
| 195 | flowQ, err := outgoing.NewWriter(flowID, flowPriority, flowToken.Size()) |
| 196 | if err != nil { |
| 197 | return nil, fmt.Errorf("failed to create bqueue.Writer for flow control counters: %v", err) |
| 198 | } |
| 199 | flowQ.Release(-1) // Disable flow control |
| 200 | |
| 201 | stopQ, err := outgoing.NewWriter(stopID, stopPriority, 1) |
| 202 | if err != nil { |
| 203 | return nil, fmt.Errorf("failed to create bqueue.Writer for stopping the write loop: %v", err) |
| 204 | } |
| 205 | stopQ.Release(-1) // Disable flow control |
| 206 | |
| 207 | localAddr := conn.LocalAddr() |
| 208 | ep := version.Endpoint(localAddr.Network(), localAddr.String(), rid) |
| 209 | if versions != nil { |
| 210 | ep = versions.Endpoint(localAddr.Network(), localAddr.String(), rid) |
| 211 | } |
| 212 | vif := &VIF{ |
| 213 | conn: conn, |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 214 | pool: pool, |
| 215 | reader: reader, |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 216 | ctrlCipher: c, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 217 | vcMap: newVCMap(), |
Asim Shankar | 3d13387 | 2014-05-16 23:16:31 -0700 | [diff] [blame] | 218 | acceptor: acceptor, |
| 219 | listenerOpts: listenerOpts, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 220 | localEP: ep, |
| 221 | nextVCI: initialVCI, |
| 222 | outgoing: outgoing, |
| 223 | expressQ: expressQ, |
| 224 | flowQ: flowQ, |
| 225 | flowCounters: message.NewCounters(), |
| 226 | stopQ: stopQ, |
| 227 | versions: versions, |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 228 | msgCounters: make(map[string]int64), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 229 | } |
| 230 | go vif.readLoop() |
| 231 | go vif.writeLoop() |
| 232 | return vif, nil |
| 233 | } |
| 234 | |
| 235 | // Dial creates a new VC to the provided remote identity, authenticating the VC |
| 236 | // with the provided local identity. |
| 237 | func (vif *VIF) Dial(remoteEP naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) { |
Asim Shankar | 7b5c94a | 2014-10-28 21:42:56 -0700 | [diff] [blame] | 238 | vc, err := vif.newVC(vif.allocVCI(), vif.localEP, remoteEP, true) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 239 | if err != nil { |
| 240 | return nil, err |
| 241 | } |
| 242 | counters := message.NewCounters() |
| 243 | counters.Add(vc.VCI(), sharedFlowID, defaultBytesBufferedPerFlow) |
| 244 | err = vif.sendOnExpressQ(&message.OpenVC{ |
| 245 | VCI: vc.VCI(), |
| 246 | DstEndpoint: remoteEP, |
Asim Shankar | 7b5c94a | 2014-10-28 21:42:56 -0700 | [diff] [blame] | 247 | SrcEndpoint: vif.localEP, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 248 | Counters: counters}) |
| 249 | if err != nil { |
| 250 | err = fmt.Errorf("vif.sendOnExpressQ(OpenVC) failed: %v", err) |
| 251 | vc.Close(err.Error()) |
| 252 | return nil, err |
| 253 | } |
| 254 | if err := vc.HandshakeDialedVC(opts...); err != nil { |
| 255 | vif.vcMap.Delete(vc.VCI()) |
| 256 | err = fmt.Errorf("VC handshake failed: %v", err) |
| 257 | vc.Close(err.Error()) |
| 258 | return nil, err |
| 259 | } |
| 260 | return vc, nil |
| 261 | } |
| 262 | |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 263 | // addSet adds a set to the list of sets this VIF is in. This method is called |
| 264 | // by Set.Insert(). |
| 265 | func (vif *VIF) addSet(s *Set) { |
| 266 | vif.muSets.Lock() |
| 267 | defer vif.muSets.Unlock() |
| 268 | vif.sets = append(vif.sets, s) |
| 269 | } |
| 270 | |
| 271 | // removeSet removes a set from the list of sets this VIF is in. This method is |
| 272 | // called by Set.Delete(). |
| 273 | func (vif *VIF) removeSet(s *Set) { |
| 274 | vif.muSets.Lock() |
| 275 | defer vif.muSets.Unlock() |
| 276 | for ix, vs := range vif.sets { |
| 277 | if vs == s { |
| 278 | vif.sets = append(vif.sets[:ix], vif.sets[ix+1:]...) |
| 279 | return |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | } |
| 284 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 285 | // Close closes all VCs (and thereby Flows) over the VIF and then closes the |
| 286 | // underlying network connection after draining all pending writes on those |
| 287 | // VCs. |
| 288 | func (vif *VIF) Close() { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 289 | vif.isClosedMu.Lock() |
| 290 | if vif.isClosed { |
| 291 | vif.isClosedMu.Unlock() |
| 292 | return |
| 293 | } |
| 294 | vif.isClosed = true |
| 295 | vif.isClosedMu.Unlock() |
| 296 | |
Robin Thellend | 2224ffa | 2015-02-14 21:28:27 -0800 | [diff] [blame] | 297 | vif.muSets.Lock() |
| 298 | sets := vif.sets |
| 299 | vif.sets = nil |
| 300 | vif.muSets.Unlock() |
| 301 | for _, s := range sets { |
| 302 | s.Delete(vif) |
| 303 | } |
| 304 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 305 | vlog.VI(1).Infof("Closing VIF %s", vif) |
| 306 | // Stop accepting new VCs. |
| 307 | vif.StopAccepting() |
| 308 | // Close local datastructures for all existing VCs. |
| 309 | vcs := vif.vcMap.Freeze() |
| 310 | for _, vc := range vcs { |
| 311 | vc.VC.Close("VIF is being closed") |
| 312 | } |
| 313 | // Wait for the vcWriteLoops to exit (after draining queued up messages). |
| 314 | vif.stopQ.Close() |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 315 | vif.wpending.Wait() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 316 | // Close the underlying network connection. |
| 317 | // No need to send individual messages to close all pending VCs since |
| 318 | // the remote end should know to close all VCs when the VIF's |
| 319 | // connection breaks. |
| 320 | if err := vif.conn.Close(); err != nil { |
| 321 | vlog.VI(1).Infof("net.Conn.Close failed on VIF %s: %v", vif, err) |
| 322 | } |
| 323 | } |
| 324 | |
| 325 | // StartAccepting begins accepting Flows (and VCs) initiated by the remote end |
| 326 | // of a VIF. opts is used to setup the listener on newly established VCs. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 327 | func (vif *VIF) StartAccepting(opts ...stream.ListenerOpt) error { |
| 328 | vif.muListen.Lock() |
| 329 | defer vif.muListen.Unlock() |
| 330 | if vif.acceptor != nil { |
| 331 | return fmt.Errorf("already accepting Flows on VIF %v", vif) |
| 332 | } |
| 333 | vif.acceptor = upcqueue.New() |
| 334 | vif.listenerOpts = opts |
| 335 | return nil |
| 336 | } |
| 337 | |
| 338 | // StopAccepting prevents any Flows initiated by the remote end of a VIF from |
| 339 | // being accepted and causes any existing and future calls to Accept to fail |
| 340 | // immediately. |
| 341 | func (vif *VIF) StopAccepting() { |
| 342 | vif.muListen.Lock() |
| 343 | defer vif.muListen.Unlock() |
| 344 | if vif.acceptor != nil { |
| 345 | vif.acceptor.Shutdown() |
| 346 | vif.acceptor = nil |
| 347 | vif.listenerOpts = nil |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | // Accept returns the (stream.Connector, stream.Flow) pair of a newly |
| 352 | // established VC and/or Flow. |
| 353 | // |
| 354 | // Sample usage: |
| 355 | // for { |
| 356 | // cAndf, err := vif.Accept() |
| 357 | // switch { |
| 358 | // case err != nil: |
| 359 | // fmt.Println("Accept error:", err) |
| 360 | // return |
| 361 | // case cAndf.Flow == nil: |
| 362 | // fmt.Println("New VC established:", cAndf.Connector) |
| 363 | // default: |
| 364 | // fmt.Println("New flow established") |
| 365 | // go handleFlow(cAndf.Flow) |
| 366 | // } |
| 367 | // } |
| 368 | func (vif *VIF) Accept() (ConnectorAndFlow, error) { |
| 369 | vif.muListen.Lock() |
| 370 | acceptor := vif.acceptor |
| 371 | vif.muListen.Unlock() |
| 372 | if acceptor == nil { |
| 373 | return ConnectorAndFlow{}, fmt.Errorf("VCs not accepted on VIF %v", vif) |
| 374 | } |
| 375 | item, err := acceptor.Get(nil) |
| 376 | if err != nil { |
| 377 | return ConnectorAndFlow{}, fmt.Errorf("Accept failed: %v", err) |
| 378 | } |
| 379 | return item.(ConnectorAndFlow), nil |
| 380 | } |
| 381 | |
| 382 | func (vif *VIF) String() string { |
| 383 | l := vif.conn.LocalAddr() |
| 384 | r := vif.conn.RemoteAddr() |
| 385 | return fmt.Sprintf("(%s, %s) <-> (%s, %s)", r.Network(), r, l.Network(), l) |
| 386 | } |
| 387 | |
| 388 | func (vif *VIF) readLoop() { |
| 389 | defer vif.Close() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 390 | defer vif.stopVCDispatchLoops() |
| 391 | for { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 392 | // vif.ctrlCipher is guarded by vif.writeMu. However, the only mutation |
| 393 | // to it is in handleMessage, which runs in the same goroutine, so a |
| 394 | // lock is not required here. |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 395 | msg, err := message.ReadFrom(vif.reader, vif.ctrlCipher) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 396 | if err != nil { |
| 397 | vlog.VI(1).Infof("Exiting readLoop of VIF %s because of read error: %v", vif, err) |
| 398 | return |
| 399 | } |
| 400 | vlog.VI(3).Infof("Received %T = [%v] on VIF %s", msg, msg, vif) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 401 | if err := vif.handleMessage(msg); err != nil { |
| 402 | vlog.VI(1).Infof("Exiting readLoop of VIF %s because of message error: %v", vif, err) |
| 403 | return |
| 404 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 405 | } |
| 406 | } |
| 407 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 408 | // handleMessage handles a single incoming message. Any error returned is |
| 409 | // fatal, causing the VIF to close. |
| 410 | func (vif *VIF) handleMessage(msg message.T) error { |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 411 | vif.muMsgCounters.Lock() |
| 412 | vif.msgCounters[fmt.Sprintf("Recv(%T)", msg)]++ |
| 413 | vif.muMsgCounters.Unlock() |
| 414 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 415 | switch m := msg.(type) { |
| 416 | case *message.Data: |
| 417 | _, rq, _ := vif.vcMap.Find(m.VCI) |
| 418 | if rq == nil { |
| 419 | vlog.VI(2).Infof("Ignoring message of %d bytes for unrecognized VCI %d on VIF %s", m.Payload.Size(), m.VCI, vif) |
| 420 | m.Release() |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 421 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 422 | } |
| 423 | if err := rq.Put(m, nil); err != nil { |
| 424 | vlog.VI(2).Infof("Failed to put message(%v) on VC queue on VIF %v: %v", m, vif, err) |
| 425 | m.Release() |
| 426 | } |
| 427 | case *message.OpenVC: |
| 428 | vif.muListen.Lock() |
| 429 | closed := vif.acceptor == nil || vif.acceptor.IsClosed() |
| 430 | lopts := vif.listenerOpts |
| 431 | vif.muListen.Unlock() |
| 432 | if closed { |
| 433 | vlog.VI(2).Infof("Ignoring OpenVC message %+v as VIF %s does not accept VCs", m, vif) |
| 434 | vif.sendOnExpressQ(&message.CloseVC{ |
| 435 | VCI: m.VCI, |
| 436 | Error: "VCs not accepted", |
| 437 | }) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 438 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 439 | } |
| 440 | vc, err := vif.newVC(m.VCI, m.DstEndpoint, m.SrcEndpoint, false) |
| 441 | vif.distributeCounters(m.Counters) |
| 442 | if err != nil { |
| 443 | vif.sendOnExpressQ(&message.CloseVC{ |
| 444 | VCI: m.VCI, |
| 445 | Error: err.Error(), |
| 446 | }) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 447 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 448 | } |
| 449 | go vif.acceptFlowsLoop(vc, vc.HandshakeAcceptedVC(lopts...)) |
| 450 | case *message.CloseVC: |
| 451 | if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil { |
| 452 | vif.vcMap.Delete(vc.VCI()) |
| 453 | vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif) |
Cosmos Nicolaou | 1534b3f | 2014-12-10 15:30:00 -0800 | [diff] [blame] | 454 | // TODO(cnicolaou): it would be nice to have a method on VC |
| 455 | // to indicate a 'remote close' rather than a 'local one'. This helps |
| 456 | // with error reporting since we expect reads/writes to occur |
| 457 | // after a remote close, but not after a local close. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 458 | vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error)) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 459 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 460 | } |
| 461 | vlog.VI(2).Infof("Ignoring CloseVC(%+v) for unrecognized VCI on VIF %s", m, vif) |
| 462 | case *message.AddReceiveBuffers: |
| 463 | vif.distributeCounters(m.Counters) |
| 464 | case *message.OpenFlow: |
| 465 | if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil { |
| 466 | if err := vc.AcceptFlow(m.Flow); err != nil { |
| 467 | vlog.VI(3).Infof("OpenFlow %+v on VIF %v failed:%v", m, vif, err) |
| 468 | cm := &message.Data{VCI: m.VCI, Flow: m.Flow} |
| 469 | cm.SetClose() |
| 470 | vif.sendOnExpressQ(cm) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 471 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 472 | } |
| 473 | vc.ReleaseCounters(m.Flow, m.InitialCounters) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 474 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 475 | } |
| 476 | vlog.VI(2).Infof("Ignoring OpenFlow(%+v) for unrecognized VCI on VIF %s", m, m, vif) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 477 | case *message.HopSetup: |
| 478 | // Configure the VIF. This takes over the conn during negotiation. |
| 479 | if vif.isSetup { |
| 480 | return errAlreadySetup |
| 481 | } |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 482 | vif.muListen.Lock() |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 483 | principal, lBlessings, dischargeClient, err := serverAuthOptions(vif.listenerOpts) |
Cosmos Nicolaou | 5a8a125 | 2014-12-01 14:14:25 -0800 | [diff] [blame] | 484 | vif.muListen.Unlock() |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 485 | if err != nil { |
| 486 | return errVersionNegotiationFailed |
| 487 | } |
| 488 | vif.writeMu.Lock() |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 489 | c, err := AuthenticateAsServer(vif.conn, vif.reader, vif.versions, principal, lBlessings, dischargeClient, m) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 490 | if err != nil { |
| 491 | vif.writeMu.Unlock() |
| 492 | return err |
| 493 | } |
| 494 | vif.ctrlCipher = c |
| 495 | vif.writeMu.Unlock() |
| 496 | vif.isSetup = true |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 497 | default: |
| 498 | vlog.Infof("Ignoring unrecognized message %T on VIF %s", m, vif) |
| 499 | } |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 500 | return nil |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 501 | } |
| 502 | |
| 503 | func (vif *VIF) vcDispatchLoop(vc *vc.VC, messages *pcqueue.T) { |
| 504 | defer vlog.VI(2).Infof("Exiting vcDispatchLoop(%v) on VIF %v", vc, vif) |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 505 | defer vif.rpending.Done() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 506 | for { |
| 507 | qm, err := messages.Get(nil) |
| 508 | if err != nil { |
| 509 | return |
| 510 | } |
| 511 | m := qm.(*message.Data) |
| 512 | if err := vc.DispatchPayload(m.Flow, m.Payload); err != nil { |
| 513 | vlog.VI(2).Infof("Ignoring data message %v for on VIF %v: %v", m, vif, err) |
| 514 | } |
| 515 | if m.Close() { |
| 516 | vif.shutdownFlow(vc, m.Flow) |
| 517 | } |
| 518 | } |
| 519 | } |
| 520 | |
| 521 | func (vif *VIF) stopVCDispatchLoops() { |
| 522 | vcs := vif.vcMap.Freeze() |
| 523 | for _, v := range vcs { |
| 524 | v.RQ.Close() |
| 525 | } |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 526 | vif.rpending.Wait() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 527 | } |
| 528 | |
| 529 | func (vif *VIF) acceptFlowsLoop(vc *vc.VC, c <-chan vc.HandshakeResult) { |
| 530 | hr := <-c |
| 531 | if hr.Error != nil { |
| 532 | vif.closeVCAndSendMsg(vc, hr.Error.Error()) |
| 533 | return |
| 534 | } |
| 535 | |
| 536 | vif.muListen.Lock() |
| 537 | acceptor := vif.acceptor |
| 538 | vif.muListen.Unlock() |
| 539 | if acceptor == nil { |
| 540 | vif.closeVCAndSendMsg(vc, "Flows no longer being accepted") |
| 541 | return |
| 542 | } |
| 543 | |
| 544 | // Notify any listeners that a new VC has been established |
| 545 | if err := acceptor.Put(ConnectorAndFlow{vc, nil}); err != nil { |
| 546 | vif.closeVCAndSendMsg(vc, fmt.Sprintf("VC accept failed: %v", err)) |
| 547 | return |
| 548 | } |
| 549 | |
| 550 | vlog.VI(2).Infof("Running acceptFlowsLoop for VC %v on VIF %v", vc, vif) |
| 551 | for { |
| 552 | f, err := hr.Listener.Accept() |
| 553 | if err != nil { |
Shyam Jayaraman | dbae76b | 2014-11-17 12:51:29 -0800 | [diff] [blame] | 554 | vlog.VI(2).Infof("Accept failed on VC %v on VIF %v: %v", vc, vif, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 555 | return |
| 556 | } |
| 557 | if err := acceptor.Put(ConnectorAndFlow{vc, f}); err != nil { |
| 558 | vlog.VI(2).Infof("vif.acceptor.Put(%v, %T) on VIF %v failed: %v", vc, f, vif, err) |
| 559 | f.Close() |
| 560 | return |
| 561 | } |
| 562 | } |
| 563 | } |
| 564 | |
| 565 | func (vif *VIF) distributeCounters(counters message.Counters) { |
| 566 | for cid, bytes := range counters { |
| 567 | vc, _, _ := vif.vcMap.Find(cid.VCI()) |
| 568 | if vc == nil { |
| 569 | vlog.VI(2).Infof("Ignoring counters for non-existent VCI %d on VIF %s", cid.VCI(), vif) |
| 570 | continue |
| 571 | } |
| 572 | vc.ReleaseCounters(cid.Flow(), bytes) |
| 573 | } |
| 574 | } |
| 575 | |
| 576 | func (vif *VIF) writeLoop() { |
| 577 | defer vif.outgoing.Close() |
| 578 | defer vif.stopVCWriteLoops() |
| 579 | for { |
| 580 | writer, bufs, err := vif.outgoing.Get(nil) |
| 581 | if err != nil { |
| 582 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because of bqueue.Get error: %v", vif, err) |
| 583 | return |
| 584 | } |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 585 | vif.muMsgCounters.Lock() |
| 586 | vif.msgCounters[fmt.Sprintf("Send(%T)", writer)]++ |
| 587 | vif.muMsgCounters.Unlock() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 588 | switch writer { |
| 589 | case vif.expressQ: |
| 590 | for _, b := range bufs { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 591 | if err := vif.writeSerializedMessage(b.Contents); err != nil { |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 592 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because Control message write failed: %s", vif, err) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 593 | releaseBufs(bufs) |
| 594 | return |
| 595 | } |
| 596 | b.Release() |
| 597 | } |
| 598 | case vif.flowQ: |
| 599 | msg := &message.AddReceiveBuffers{} |
| 600 | // No need to call releaseBufs(bufs) as all bufs are |
| 601 | // the exact same value: flowToken. |
| 602 | vif.flowMu.Lock() |
| 603 | if len(vif.flowCounters) > 0 { |
| 604 | msg.Counters = vif.flowCounters |
| 605 | vif.flowCounters = message.NewCounters() |
| 606 | } |
| 607 | vif.flowMu.Unlock() |
| 608 | if len(msg.Counters) > 0 { |
| 609 | vlog.VI(3).Infof("Sending counters %v on VIF %s", msg.Counters, vif) |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 610 | if err := vif.writeMessage(msg); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 611 | vlog.VI(1).Infof("Exiting writeLoop of VIF %s because AddReceiveBuffers message write failed: %v", vif, err) |
| 612 | return |
| 613 | } |
| 614 | } |
| 615 | case vif.stopQ: |
| 616 | // Lowest-priority queue which will never have any |
| 617 | // buffers, Close is the only method called on it. |
| 618 | return |
| 619 | default: |
| 620 | vif.writeDataMessages(writer, bufs) |
| 621 | } |
| 622 | } |
| 623 | } |
| 624 | |
| 625 | func (vif *VIF) vcWriteLoop(vc *vc.VC, messages *pcqueue.T) { |
| 626 | defer vlog.VI(2).Infof("Exiting vcWriteLoop(%v) on VIF %v", vc, vif) |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 627 | defer vif.wpending.Done() |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 628 | for { |
| 629 | qm, err := messages.Get(nil) |
| 630 | if err != nil { |
| 631 | return |
| 632 | } |
| 633 | m := qm.(*message.Data) |
| 634 | m.Payload, err = vc.Encrypt(m.Flow, m.Payload) |
| 635 | if err != nil { |
| 636 | vlog.Infof("Encryption failed. Flow:%v VC:%v Error:%v", m.Flow, vc, err) |
| 637 | } |
| 638 | if m.Close() { |
| 639 | // The last bytes written on the flow will be sent out |
| 640 | // on vif.conn. Local datastructures for the flow can |
| 641 | // be cleaned up now. |
| 642 | vif.shutdownFlow(vc, m.Flow) |
| 643 | } |
| 644 | if err == nil { |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 645 | err = vif.writeMessage(m) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 646 | } |
| 647 | if err != nil { |
Bogdan Caprita | ad5761f | 2014-09-23 10:56:23 -0700 | [diff] [blame] | 648 | // TODO(caprita): Calling closeVCAndSendMsg below causes |
| 649 | // a race as described in: |
| 650 | // https://docs.google.com/a/google.com/document/d/1C0kxfYhuOcStdV7tnLZELZpUhfQCZj47B0JrzbE29h8/edit |
| 651 | // |
| 652 | // There should be a finer grained way to fix this, and |
| 653 | // there are likely other instances where we should not |
| 654 | // be closing the VC. |
| 655 | // |
| 656 | // For now, commenting out the line below removes the |
| 657 | // flakiness from our existing unit tests, but this |
| 658 | // needs to be revisited and fixed correctly. |
| 659 | // |
| 660 | // vif.closeVCAndSendMsg(vc, fmt.Sprintf("write failure: %v", err)) |
| 661 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 662 | // Drain the queue and exit. |
| 663 | for { |
| 664 | qm, err := messages.Get(nil) |
| 665 | if err != nil { |
| 666 | return |
| 667 | } |
| 668 | qm.(*message.Data).Release() |
| 669 | } |
| 670 | } |
| 671 | } |
| 672 | } |
| 673 | |
| 674 | func (vif *VIF) stopVCWriteLoops() { |
| 675 | vcs := vif.vcMap.Freeze() |
| 676 | for _, v := range vcs { |
| 677 | v.WQ.Close() |
| 678 | } |
| 679 | } |
| 680 | |
| 681 | // sendOnExpressQ adds 'msg' to the expressQ (highest priority queue) of messages to write on the wire. |
| 682 | func (vif *VIF) sendOnExpressQ(msg message.T) error { |
Cosmos Nicolaou | 82d00d8 | 2015-02-10 21:31:00 -0800 | [diff] [blame] | 683 | vlog.VI(2).Infof("sendOnExpressQ(%T = %+v) on VIF %s", msg, msg, vif) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 684 | var buf bytes.Buffer |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 685 | // Don't encrypt yet, because the message ordering isn't yet determined. |
| 686 | // Encryption is performed by vif.writeSerializedMessage() when the |
| 687 | // message is actually written to vif.conn. |
| 688 | vif.writeMu.Lock() |
| 689 | c := vif.ctrlCipher |
| 690 | vif.writeMu.Unlock() |
| 691 | if err := message.WriteTo(&buf, msg, crypto.NewDisabledControlCipher(c)); err != nil { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 692 | return err |
| 693 | } |
| 694 | return vif.expressQ.Put(iobuf.NewSlice(buf.Bytes()), nil) |
| 695 | } |
| 696 | |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 697 | // writeMessage writes the message to the channel. Writes must be serialized so |
| 698 | // that the control channel can be encrypted, so we acquire the writeMu. |
| 699 | func (vif *VIF) writeMessage(msg message.T) error { |
| 700 | vif.writeMu.Lock() |
| 701 | defer vif.writeMu.Unlock() |
| 702 | return message.WriteTo(vif.conn, msg, vif.ctrlCipher) |
| 703 | } |
| 704 | |
| 705 | // Write writes the message to the channel, encrypting the control data. Writes |
| 706 | // must be serialized so that the control channel can be encrypted, so we |
| 707 | // acquire the writeMu. |
| 708 | func (vif *VIF) writeSerializedMessage(msg []byte) error { |
| 709 | vif.writeMu.Lock() |
| 710 | defer vif.writeMu.Unlock() |
| 711 | if err := message.EncryptMessage(msg, vif.ctrlCipher); err != nil { |
| 712 | return err |
| 713 | } |
| 714 | if n, err := vif.conn.Write(msg); err != nil { |
| 715 | return fmt.Errorf("write failed: got (%d, %v) for %d byte message", n, err, len(msg)) |
| 716 | } |
| 717 | return nil |
| 718 | } |
| 719 | |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 720 | func (vif *VIF) writeDataMessages(writer bqueue.Writer, bufs []*iobuf.Slice) { |
| 721 | vci, fid := unpackIDs(writer.ID()) |
| 722 | // iobuf.Coalesce will coalesce buffers only if they are adjacent to |
| 723 | // each other. In the worst case, each buf will be non-adjacent to the |
| 724 | // others and the code below will end up with multiple small writes |
| 725 | // instead of a single big one. |
| 726 | // Might want to investigate this and see if this needs to be |
| 727 | // revisited. |
| 728 | bufs = iobuf.Coalesce(bufs, uint(vc.MaxPayloadSizeBytes)) |
| 729 | _, _, wq := vif.vcMap.Find(vci) |
| 730 | if wq == nil { |
| 731 | // VC has been removed, stop sending messages |
| 732 | vlog.VI(2).Infof("VCI %d on VIF %s was shutdown, dropping %d messages that were pending a write", vci, vif, len(bufs)) |
| 733 | releaseBufs(bufs) |
| 734 | return |
| 735 | } |
| 736 | last := len(bufs) - 1 |
| 737 | drained := writer.IsDrained() |
| 738 | for i, b := range bufs { |
| 739 | d := &message.Data{VCI: vci, Flow: fid, Payload: b} |
| 740 | if drained && i == last { |
| 741 | d.SetClose() |
| 742 | } |
| 743 | if err := wq.Put(d, nil); err != nil { |
| 744 | releaseBufs(bufs[i:]) |
| 745 | return |
| 746 | } |
| 747 | } |
| 748 | if len(bufs) == 0 && drained { |
| 749 | d := &message.Data{VCI: vci, Flow: fid} |
| 750 | d.SetClose() |
| 751 | if err := wq.Put(d, nil); err != nil { |
| 752 | d.Release() |
| 753 | } |
| 754 | } |
| 755 | } |
| 756 | |
| 757 | func (vif *VIF) allocVCI() id.VC { |
| 758 | vif.muNextVCI.Lock() |
| 759 | ret := vif.nextVCI |
| 760 | vif.nextVCI += 2 |
| 761 | vif.muNextVCI.Unlock() |
| 762 | return ret |
| 763 | } |
| 764 | |
| 765 | func (vif *VIF) newVC(vci id.VC, localEP, remoteEP naming.Endpoint, dialed bool) (*vc.VC, error) { |
Ryan Brown | 407b732 | 2014-07-22 14:18:08 -0700 | [diff] [blame] | 766 | version, err := version.CommonVersion(localEP, remoteEP) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 767 | if vif.versions != nil { |
Ryan Brown | 407b732 | 2014-07-22 14:18:08 -0700 | [diff] [blame] | 768 | version, err = vif.versions.CommonVersion(localEP, remoteEP) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 769 | } |
| 770 | if err != nil { |
| 771 | return nil, err |
| 772 | } |
| 773 | vc := vc.InternalNew(vc.Params{ |
| 774 | VCI: vci, |
| 775 | Dialed: dialed, |
| 776 | LocalEP: localEP, |
| 777 | RemoteEP: remoteEP, |
| 778 | Pool: vif.pool, |
Jason Hickey | 96d30e8 | 2014-11-13 07:40:00 -0800 | [diff] [blame] | 779 | ReserveBytes: uint(message.HeaderSizeBytes + vif.ctrlCipher.MACSize()), |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 780 | Helper: vcHelper{vif}, |
Ryan Brown | 407b732 | 2014-07-22 14:18:08 -0700 | [diff] [blame] | 781 | Version: version, |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 782 | }) |
| 783 | added, rq, wq := vif.vcMap.Insert(vc) |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 784 | // Start vcWriteLoop |
| 785 | if added = added && vif.wpending.TryAdd(); added { |
| 786 | go vif.vcWriteLoop(vc, wq) |
| 787 | } |
| 788 | // Start vcDispatchLoop |
| 789 | if added = added && vif.rpending.TryAdd(); added { |
| 790 | go vif.vcDispatchLoop(vc, rq) |
| 791 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 792 | if !added { |
Asim Shankar | f578110 | 2014-06-26 22:05:35 -0700 | [diff] [blame] | 793 | if rq != nil { |
| 794 | rq.Close() |
| 795 | } |
| 796 | if wq != nil { |
| 797 | wq.Close() |
| 798 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 799 | vif.vcMap.Delete(vci) |
| 800 | vc.Close("underlying network connection shutting down") |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 801 | // We embed an error inside verror.ErrAborted because other layers |
Mike Burrows | 35baad9 | 2015-02-10 13:42:53 -0800 | [diff] [blame] | 802 | // check for the "Aborted" error as a special case. Perhaps |
| 803 | // eventually we'll get rid of the Aborted layer. |
Jiri Simsa | 074bf36 | 2015-02-17 09:29:45 -0800 | [diff] [blame] | 804 | return nil, verror.New(verror.ErrAborted, nil, verror.New(errShuttingDown, nil, vif)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 805 | } |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 806 | return vc, nil |
| 807 | } |
| 808 | |
| 809 | func (vif *VIF) closeVCAndSendMsg(vc *vc.VC, msg string) { |
| 810 | vlog.VI(2).Infof("Shutting down VCI %d on VIF %v due to: %v", vc.VCI(), vif, msg) |
| 811 | vif.vcMap.Delete(vc.VCI()) |
| 812 | vc.Close(msg) |
| 813 | if err := vif.sendOnExpressQ(&message.CloseVC{ |
| 814 | VCI: vc.VCI(), |
| 815 | Error: msg, |
| 816 | }); err != nil { |
| 817 | vlog.VI(2).Infof("sendOnExpressQ(CloseVC{VCI:%d,...}) on VIF %v failed: %v", vc.VCI(), vif, err) |
| 818 | } |
| 819 | } |
| 820 | |
| 821 | // shutdownFlow clears out all the datastructures associated with fid. |
| 822 | func (vif *VIF) shutdownFlow(vc *vc.VC, fid id.Flow) { |
| 823 | vc.ShutdownFlow(fid) |
| 824 | vif.flowMu.Lock() |
| 825 | delete(vif.flowCounters, message.MakeCounterID(vc.VCI(), fid)) |
| 826 | vif.flowMu.Unlock() |
| 827 | } |
| 828 | |
| 829 | // ShutdownVCs closes all VCs established to the provided remote endpoint. |
| 830 | // Returns the number of VCs that were closed. |
| 831 | func (vif *VIF) ShutdownVCs(remote naming.Endpoint) int { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 832 | vcs := vif.vcMap.List() |
| 833 | n := 0 |
| 834 | for _, vc := range vcs { |
Asim Shankar | 7cf2900 | 2014-10-09 00:38:37 -0700 | [diff] [blame] | 835 | if naming.Compare(vc.RemoteAddr().RoutingID(), remote.RoutingID()) { |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 836 | vlog.VI(1).Infof("VCI %d on VIF %s being closed because of ShutdownVCs call", vc.VCI(), vif) |
| 837 | vif.closeVCAndSendMsg(vc, "") |
| 838 | n++ |
| 839 | } |
| 840 | } |
| 841 | return n |
| 842 | } |
| 843 | |
| 844 | // NumVCs returns the number of VCs established over this VIF. |
| 845 | func (vif *VIF) NumVCs() int { return vif.vcMap.Size() } |
| 846 | |
| 847 | // DebugString returns a descriptive state of the VIF. |
| 848 | // |
| 849 | // The returned string is meant for consumptions by humans. The specific format |
| 850 | // should not be relied upon by any automated processing. |
| 851 | func (vif *VIF) DebugString() string { |
| 852 | vcs := vif.vcMap.List() |
| 853 | l := make([]string, 0, len(vcs)+1) |
| 854 | |
| 855 | vif.muNextVCI.Lock() // Needed for vif.nextVCI |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 856 | l = append(l, fmt.Sprintf("VIF:[%s] -- #VCs:%d NextVCI:%d ControlChannelEncryption:%v IsClosed:%v", vif, len(vcs), vif.nextVCI, vif.isSetup, vif.isClosed)) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 857 | vif.muNextVCI.Unlock() |
| 858 | |
| 859 | for _, vc := range vcs { |
| 860 | l = append(l, vc.DebugString()) |
| 861 | } |
Robin Thellend | 5bd7242 | 2015-02-17 12:36:38 -0800 | [diff] [blame^] | 862 | |
| 863 | l = append(l, "Message Counters:") |
| 864 | ctrs := len(l) |
| 865 | vif.muMsgCounters.Lock() |
| 866 | for k, v := range vif.msgCounters { |
| 867 | l = append(l, fmt.Sprintf(" %-32s %10d", k, v)) |
| 868 | } |
| 869 | vif.muMsgCounters.Unlock() |
| 870 | sort.Strings(l[ctrs:]) |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 871 | return strings.Join(l, "\n") |
| 872 | } |
| 873 | |
| 874 | // Methods and type that implement vc.Helper |
Jungho Ahn | 4b9a519 | 2015-02-02 13:11:08 -0800 | [diff] [blame] | 875 | // |
| 876 | // We create a separate type for vc.Helper to hide the vc.Helper methods |
| 877 | // from the exported method set of VIF. |
Jiri Simsa | 5293dcb | 2014-05-10 09:56:38 -0700 | [diff] [blame] | 878 | type vcHelper struct{ vif *VIF } |
| 879 | |
| 880 | func (h vcHelper) NotifyOfNewFlow(vci id.VC, fid id.Flow, bytes uint) { |
| 881 | h.vif.sendOnExpressQ(&message.OpenFlow{VCI: vci, Flow: fid, InitialCounters: uint32(bytes)}) |
| 882 | } |
| 883 | |
| 884 | func (h vcHelper) AddReceiveBuffers(vci id.VC, fid id.Flow, bytes uint) { |
| 885 | if bytes == 0 { |
| 886 | return |
| 887 | } |
| 888 | h.vif.flowMu.Lock() |
| 889 | h.vif.flowCounters.Add(vci, fid, uint32(bytes)) |
| 890 | h.vif.flowMu.Unlock() |
| 891 | h.vif.flowQ.TryPut(flowToken) |
| 892 | } |
| 893 | |
| 894 | func (h vcHelper) NewWriter(vci id.VC, fid id.Flow) (bqueue.Writer, error) { |
| 895 | return h.vif.outgoing.NewWriter(packIDs(vci, fid), normalPriority, defaultBytesBufferedPerFlow) |
| 896 | } |
| 897 | |
| 898 | // The token added to vif.flowQ. |
| 899 | var flowToken *iobuf.Slice |
| 900 | |
| 901 | func init() { |
| 902 | // flowToken must be non-empty otherwise bqueue.Writer.Put will ignore it. |
| 903 | flowToken = iobuf.NewSlice(make([]byte, 1)) |
| 904 | } |
| 905 | |
| 906 | func packIDs(vci id.VC, fid id.Flow) bqueue.ID { |
| 907 | return bqueue.ID(message.MakeCounterID(vci, fid)) |
| 908 | } |
| 909 | |
| 910 | func unpackIDs(b bqueue.ID) (id.VC, id.Flow) { |
| 911 | cid := message.CounterID(b) |
| 912 | return cid.VCI(), cid.Flow() |
| 913 | } |
| 914 | |
| 915 | func releaseBufs(bufs []*iobuf.Slice) { |
| 916 | for _, b := range bufs { |
| 917 | b.Release() |
| 918 | } |
| 919 | } |