Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package conn |
| 6 | |
| 7 | import ( |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 8 | "reflect" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 9 | "sync" |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 10 | "time" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 11 | |
| 12 | "v.io/v23" |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 13 | "v.io/v23/context" |
| 14 | "v.io/v23/flow" |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 15 | "v.io/v23/flow/message" |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 16 | "v.io/v23/naming" |
| 17 | "v.io/v23/rpc/version" |
| 18 | "v.io/v23/security" |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 19 | "v.io/v23/verror" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 20 | "v.io/x/ref/runtime/internal/flow/flowcontrol" |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 21 | ) |
| 22 | |
Matt Rosencrantz | 0516975 | 2015-07-31 11:08:28 -0700 | [diff] [blame] | 23 | // flowID is a number assigned to identify a flow. |
| 24 | // Each flow on a given conn will have a unique number. |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 25 | const ( |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 26 | invalidFlowID = iota |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 27 | blessingsFlowID |
| 28 | reservedFlows = 10 |
| 29 | ) |
| 30 | |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 31 | const mtu = 1 << 16 |
| 32 | const defaultBufferSize = 1 << 20 |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 33 | |
| 34 | const ( |
| 35 | expressPriority = iota |
| 36 | flowPriority |
| 37 | tearDownPriority |
| 38 | ) |
| 39 | |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 40 | // FlowHandlers process accepted flows. |
| 41 | type FlowHandler interface { |
| 42 | // HandleFlow processes an accepted flow. |
| 43 | HandleFlow(flow.Flow) error |
| 44 | } |
| 45 | |
| 46 | // Conns are a multiplexing encrypted channels that can host Flows. |
| 47 | type Conn struct { |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 48 | fc *flowcontrol.FlowController |
| 49 | mp *messagePipe |
| 50 | handler FlowHandler |
| 51 | version version.RPCVersion |
| 52 | lBlessings, rBlessings security.Blessings |
| 53 | local, remote naming.Endpoint |
| 54 | closed chan struct{} |
| 55 | blessingsFlow *blessingsFlow |
| 56 | loopWG sync.WaitGroup |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 57 | |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 58 | mu sync.Mutex |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 59 | nextFid uint64 |
| 60 | flows map[uint64]*flw |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 61 | dischargeTimer *time.Timer |
Suharsh Sivakumar | 79b3179 | 2015-08-24 15:39:15 -0700 | [diff] [blame] | 62 | lastUsedTime time.Time |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 63 | toRelease map[uint64]uint64 |
| 64 | borrowing map[uint64]bool |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 65 | } |
| 66 | |
Suharsh Sivakumar | d0aedf8 | 2015-08-27 13:14:19 -0700 | [diff] [blame] | 67 | // Ensure that *Conn implements flow.ManagedConn. |
| 68 | var _ flow.ManagedConn = &Conn{} |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 69 | |
| 70 | // NewDialed dials a new Conn on the given conn. |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 71 | func NewDialed( |
| 72 | ctx *context.T, |
Suharsh Sivakumar | 4932b8a | 2015-08-24 13:08:43 -0700 | [diff] [blame] | 73 | conn flow.MsgReadWriteCloser, |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 74 | local, remote naming.Endpoint, |
| 75 | versions version.RPCVersionRange, |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 76 | handler FlowHandler) (*Conn, error) { |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 77 | c := &Conn{ |
Suharsh Sivakumar | 79b3179 | 2015-08-24 15:39:15 -0700 | [diff] [blame] | 78 | fc: flowcontrol.New(defaultBufferSize, mtu), |
| 79 | mp: newMessagePipe(conn), |
| 80 | handler: handler, |
| 81 | lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(), |
| 82 | local: local, |
| 83 | remote: remote, |
| 84 | closed: make(chan struct{}), |
| 85 | nextFid: reservedFlows, |
| 86 | flows: map[uint64]*flw{}, |
| 87 | lastUsedTime: time.Now(), |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 88 | toRelease: map[uint64]uint64{}, |
| 89 | borrowing: map[uint64]bool{}, |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 90 | } |
| 91 | if err := c.dialHandshake(ctx, versions); err != nil { |
| 92 | c.Close(ctx, err) |
| 93 | return nil, err |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 94 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 95 | c.loopWG.Add(1) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 96 | go c.readLoop(ctx) |
| 97 | return c, nil |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 98 | } |
| 99 | |
| 100 | // NewAccepted accepts a new Conn on the given conn. |
| 101 | func NewAccepted( |
| 102 | ctx *context.T, |
Suharsh Sivakumar | 4932b8a | 2015-08-24 13:08:43 -0700 | [diff] [blame] | 103 | conn flow.MsgReadWriteCloser, |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 104 | local naming.Endpoint, |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 105 | versions version.RPCVersionRange, |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 106 | handler FlowHandler) (*Conn, error) { |
| 107 | c := &Conn{ |
Suharsh Sivakumar | 79b3179 | 2015-08-24 15:39:15 -0700 | [diff] [blame] | 108 | fc: flowcontrol.New(defaultBufferSize, mtu), |
| 109 | mp: newMessagePipe(conn), |
| 110 | handler: handler, |
| 111 | lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(), |
| 112 | local: local, |
| 113 | closed: make(chan struct{}), |
| 114 | nextFid: reservedFlows + 1, |
| 115 | flows: map[uint64]*flw{}, |
| 116 | lastUsedTime: time.Now(), |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 117 | toRelease: map[uint64]uint64{}, |
| 118 | borrowing: map[uint64]bool{}, |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 119 | } |
| 120 | if err := c.acceptHandshake(ctx, versions); err != nil { |
| 121 | c.Close(ctx, err) |
| 122 | return nil, err |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 123 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 124 | c.loopWG.Add(1) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 125 | go c.readLoop(ctx) |
| 126 | return c, nil |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 127 | } |
| 128 | |
| 129 | // Dial dials a new flow on the Conn. |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 130 | func (c *Conn) Dial(ctx *context.T, fn flow.BlessingsForPeer) (flow.Flow, error) { |
| 131 | if c.rBlessings.IsZero() { |
| 132 | return nil, NewErrDialingNonServer(ctx) |
| 133 | } |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 134 | rDischarges, err := c.blessingsFlow.getLatestDischarges(ctx, c.rBlessings) |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 135 | if err != nil { |
| 136 | return nil, err |
| 137 | } |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 138 | blessings, discharges, err := fn(ctx, c.local, c.remote, c.rBlessings, rDischarges) |
| 139 | if err != nil { |
| 140 | return nil, err |
| 141 | } |
| 142 | bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, discharges) |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 143 | if err != nil { |
| 144 | return nil, err |
| 145 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 146 | defer c.mu.Unlock() |
| 147 | c.mu.Lock() |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 148 | if c.flows == nil { |
| 149 | return nil, NewErrConnectionClosed(ctx) |
| 150 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 151 | id := c.nextFid |
| 152 | c.nextFid++ |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 153 | return c.newFlowLocked(ctx, id, bkey, dkey, true, false), nil |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 154 | } |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 155 | |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 156 | // LocalEndpoint returns the local vanadium Endpoint |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 157 | func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local } |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 158 | |
| 159 | // RemoteEndpoint returns the remote vanadium Endpoint |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 160 | func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote } |
Matt Rosencrantz | 0762591 | 2015-07-21 10:18:00 -0700 | [diff] [blame] | 161 | |
Suharsh Sivakumar | e0feb27 | 2015-09-14 18:03:48 -0700 | [diff] [blame] | 162 | // LocalBlessings returns the local blessings. |
| 163 | func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings } |
| 164 | |
| 165 | // RemoteBlessings returns the remote blessings. |
| 166 | func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings } |
| 167 | |
Suharsh Sivakumar | c0038c0 | 2015-09-09 12:50:06 -0700 | [diff] [blame] | 168 | // CommonVersion returns the RPCVersion negotiated between the local and remote endpoints. |
| 169 | func (c *Conn) CommonVersion() version.RPCVersion { return c.version } |
| 170 | |
Suharsh Sivakumar | 79b3179 | 2015-08-24 15:39:15 -0700 | [diff] [blame] | 171 | // LastUsedTime returns the time at which the Conn had bytes read or written on it. |
| 172 | func (c *Conn) LastUsedTime() time.Time { |
| 173 | defer c.mu.Unlock() |
| 174 | c.mu.Lock() |
| 175 | return c.lastUsedTime |
| 176 | } |
| 177 | |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 178 | // Closed returns a channel that will be closed after the Conn is shutdown. |
| 179 | // After this channel is closed it is guaranteed that all Dial calls will fail |
| 180 | // with an error and no more flows will be sent to the FlowHandler. |
| 181 | func (c *Conn) Closed() <-chan struct{} { return c.closed } |
| 182 | |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 183 | // Close shuts down a conn. |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 184 | func (c *Conn) Close(ctx *context.T, err error) { |
| 185 | c.mu.Lock() |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 186 | var flows map[uint64]*flw |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 187 | flows, c.flows = c.flows, nil |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 188 | if c.dischargeTimer != nil { |
| 189 | c.dischargeTimer.Stop() |
| 190 | c.dischargeTimer = nil |
| 191 | } |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 192 | c.mu.Unlock() |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 193 | |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 194 | if flows == nil { |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 195 | // This conn is already being torn down. |
| 196 | <-c.closed |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 197 | return |
| 198 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 199 | c.internalClose(ctx, err, flows) |
| 200 | } |
| 201 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 202 | func (c *Conn) internalClose(ctx *context.T, err error, flows map[uint64]*flw) { |
| 203 | ctx.VI(2).Infof("Closing connection: %v", err) |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 204 | if verror.ErrorID(err) != ErrConnClosedRemotely.ID { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 205 | msg := "" |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 206 | if err != nil { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 207 | msg = err.Error() |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 208 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 209 | cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 210 | return 0, true, c.mp.writeMsg(ctx, &message.TearDown{Message: msg}) |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 211 | }) |
| 212 | if cerr != nil { |
| 213 | ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr) |
| 214 | } |
| 215 | } |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 216 | for _, f := range flows { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 217 | f.close(ctx, NewErrConnectionClosed(ctx)) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 218 | } |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 219 | if cerr := c.mp.close(); cerr != nil { |
| 220 | ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 221 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 222 | c.loopWG.Wait() |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 223 | close(c.closed) |
| 224 | } |
| 225 | |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 226 | func (c *Conn) release(ctx *context.T, fid, count uint64) { |
| 227 | var toRelease map[uint64]uint64 |
| 228 | var release bool |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 229 | c.mu.Lock() |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 230 | c.toRelease[fid] += count |
| 231 | if c.borrowing[fid] { |
| 232 | c.toRelease[invalidFlowID] += count |
| 233 | release = c.toRelease[invalidFlowID] > defaultBufferSize/2 |
| 234 | } else { |
| 235 | release = c.toRelease[fid] > defaultBufferSize/2 |
| 236 | } |
| 237 | if release { |
| 238 | toRelease = c.toRelease |
| 239 | c.toRelease = make(map[uint64]uint64, len(c.toRelease)) |
| 240 | c.borrowing = make(map[uint64]bool, len(c.borrowing)) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 241 | } |
| 242 | c.mu.Unlock() |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 243 | |
| 244 | if toRelease == nil { |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 245 | return |
| 246 | } |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 247 | delete(toRelease, invalidFlowID) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 248 | |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 249 | err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 250 | err := c.mp.writeMsg(ctx, &message.Release{ |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 251 | Counters: toRelease, |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 252 | }) |
| 253 | return 0, true, err |
| 254 | }) |
| 255 | if err != nil { |
Matt Rosencrantz | b0c97f3 | 2015-08-08 08:34:41 -0700 | [diff] [blame] | 256 | c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err)) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 257 | } |
| 258 | } |
| 259 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 260 | func (c *Conn) handleMessage(ctx *context.T, m message.Message) error { |
| 261 | switch msg := m.(type) { |
| 262 | case *message.TearDown: |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 263 | return NewErrConnClosedRemotely(ctx, msg.Message) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 264 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 265 | case *message.OpenFlow: |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 266 | if c.handler == nil { |
| 267 | return NewErrUnexpectedMsg(ctx, "openFlow") |
| 268 | } |
| 269 | c.mu.Lock() |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 270 | f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true) |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 271 | f.worker.Release(ctx, int(msg.InitialCounters)) |
| 272 | c.toRelease[msg.ID] = defaultBufferSize |
| 273 | c.borrowing[msg.ID] = true |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 274 | c.mu.Unlock() |
| 275 | c.handler.HandleFlow(f) |
Matt Rosencrantz | bcaabf2 | 2015-09-11 14:08:53 -0700 | [diff] [blame] | 276 | if err := f.q.put(ctx, msg.Payload); err != nil { |
| 277 | return err |
| 278 | } |
| 279 | if msg.Flags&message.CloseFlag != 0 { |
| 280 | f.close(ctx, NewErrFlowClosedRemotely(f.ctx)) |
| 281 | } |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 282 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 283 | case *message.Release: |
| 284 | release := make([]flowcontrol.Release, 0, len(msg.Counters)) |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 285 | c.mu.Lock() |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 286 | for fid, val := range msg.Counters { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 287 | if f := c.flows[fid]; f != nil { |
| 288 | release = append(release, flowcontrol.Release{ |
| 289 | Worker: f.worker, |
| 290 | Tokens: int(val), |
| 291 | }) |
| 292 | } |
| 293 | } |
| 294 | c.mu.Unlock() |
| 295 | if err := c.fc.Release(ctx, release); err != nil { |
| 296 | return err |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 297 | } |
| 298 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 299 | case *message.Data: |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 300 | c.mu.Lock() |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 301 | f := c.flows[msg.ID] |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 302 | c.mu.Unlock() |
| 303 | if f == nil { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 304 | ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.ID) |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 305 | return nil |
| 306 | } |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 307 | if err := f.q.put(ctx, msg.Payload); err != nil { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 308 | return err |
| 309 | } |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 310 | if msg.Flags&message.CloseFlag != 0 { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 311 | f.close(ctx, NewErrFlowClosedRemotely(f.ctx)) |
| 312 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 313 | |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 314 | default: |
| 315 | return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String()) |
| 316 | } |
| 317 | return nil |
| 318 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 319 | |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 320 | func (c *Conn) readLoop(ctx *context.T) { |
| 321 | var err error |
| 322 | for { |
| 323 | msg, rerr := c.mp.readMsg(ctx) |
| 324 | if rerr != nil { |
| 325 | err = NewErrRecv(ctx, c.remote.String(), rerr) |
| 326 | break |
| 327 | } |
| 328 | if err = c.handleMessage(ctx, msg); err != nil { |
| 329 | break |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 330 | } |
| 331 | } |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 332 | |
| 333 | c.mu.Lock() |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 334 | var flows map[uint64]*flw |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 335 | flows, c.flows = c.flows, nil |
| 336 | c.mu.Unlock() |
| 337 | |
| 338 | c.loopWG.Done() |
| 339 | if flows != nil { |
| 340 | c.internalClose(ctx, err, flows) |
| 341 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 342 | } |
Suharsh Sivakumar | d9bffe8 | 2015-08-24 16:35:29 -0700 | [diff] [blame] | 343 | |
| 344 | func (c *Conn) markUsed() { |
| 345 | c.mu.Lock() |
| 346 | c.lastUsedTime = time.Now() |
| 347 | c.mu.Unlock() |
| 348 | } |
Suharsh Sivakumar | 2ebd6cf | 2015-09-22 16:37:31 -0700 | [diff] [blame] | 349 | |
| 350 | func (c *Conn) IsEncapsulated() bool { |
| 351 | _, ok := c.mp.rw.(*flw) |
| 352 | return ok |
| 353 | } |