Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package conn |
| 6 | |
| 7 | import ( |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 8 | "strconv" |
| 9 | |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 10 | "v.io/v23/context" |
| 11 | "v.io/v23/flow" |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 12 | "v.io/v23/flow/message" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 13 | "v.io/v23/security" |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 14 | "v.io/v23/verror" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 15 | "v.io/x/ref/runtime/internal/flow/flowcontrol" |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 16 | ) |
| 17 | |
| 18 | type flw struct { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 19 | id uint64 |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 20 | dialed bool |
| 21 | ctx *context.T |
| 22 | cancel context.CancelFunc |
| 23 | conn *Conn |
| 24 | worker *flowcontrol.Worker |
| 25 | opened bool |
| 26 | q *readq |
| 27 | bkey, dkey uint64 |
Matt Rosencrantz | 0ec053b | 2015-09-03 14:20:02 -0700 | [diff] [blame] | 28 | noEncrypt bool |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 29 | } |
| 30 | |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 31 | // Ensure that *flw implements flow.Flow. |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 32 | var _ flow.Flow = &flw{} |
| 33 | |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 34 | func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, dialed, preopen bool) *flw { |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 35 | f := &flw{ |
| 36 | id: id, |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 37 | dialed: dialed, |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 38 | conn: c, |
Matt Rosencrantz | 75250d3 | 2015-08-19 08:13:05 -0700 | [diff] [blame] | 39 | worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority), |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 40 | q: newReadQ(c, id), |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 41 | bkey: bkey, |
| 42 | dkey: dkey, |
| 43 | opened: preopen, |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 44 | } |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 45 | f.SetContext(ctx) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 46 | c.flows[id] = f |
| 47 | return f |
| 48 | } |
| 49 | |
Matt Rosencrantz | 0ec053b | 2015-09-03 14:20:02 -0700 | [diff] [blame] | 50 | // disableEncrytion should not be called concurrently with Write* methods. |
| 51 | func (f *flw) disableEncryption() { |
| 52 | f.noEncrypt = false |
| 53 | } |
| 54 | |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 55 | // Implement io.Reader. |
| 56 | // Read and ReadMsg should not be called concurrently with themselves |
| 57 | // or each other. |
| 58 | func (f *flw) Read(p []byte) (n int, err error) { |
Suharsh Sivakumar | d9bffe8 | 2015-08-24 16:35:29 -0700 | [diff] [blame] | 59 | f.conn.markUsed() |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 60 | if n, err = f.q.read(f.ctx, p); err != nil { |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 61 | f.close(f.ctx, err) |
| 62 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 63 | return |
| 64 | } |
| 65 | |
| 66 | // ReadMsg is like read, but it reads bytes in chunks. Depending on the |
| 67 | // implementation the batch boundaries might or might not be significant. |
| 68 | // Read and ReadMsg should not be called concurrently with themselves |
| 69 | // or each other. |
| 70 | func (f *flw) ReadMsg() (buf []byte, err error) { |
Suharsh Sivakumar | d9bffe8 | 2015-08-24 16:35:29 -0700 | [diff] [blame] | 71 | f.conn.markUsed() |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 72 | // TODO(mattr): Currently we only ever release counters when some flow |
| 73 | // reads. We may need to do it more or less often. Currently |
| 74 | // we'll send counters whenever a new flow is opened. |
Matt Rosencrantz | 8dc67f2 | 2015-09-09 18:26:50 -0700 | [diff] [blame] | 75 | if buf, err = f.q.get(f.ctx); err != nil { |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 76 | f.close(f.ctx, err) |
| 77 | } |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 78 | return |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 79 | } |
| 80 | |
| 81 | // Implement io.Writer. |
| 82 | // Write, WriteMsg, and WriteMsgAndClose should not be called concurrently |
| 83 | // with themselves or each other. |
| 84 | func (f *flw) Write(p []byte) (n int, err error) { |
| 85 | return f.WriteMsg(p) |
| 86 | } |
| 87 | |
| 88 | func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) { |
Suharsh Sivakumar | d9bffe8 | 2015-08-24 16:35:29 -0700 | [diff] [blame] | 89 | f.conn.markUsed() |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 90 | sent := 0 |
| 91 | var left []byte |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 92 | err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) { |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 93 | size := 0 |
| 94 | var bufs [][]byte |
| 95 | if len(left) > 0 { |
| 96 | size += len(left) |
| 97 | bufs = append(bufs, left) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 98 | left = nil |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 99 | } |
| 100 | for size <= tokens && len(parts) > 0 { |
| 101 | bufs = append(bufs, parts[0]) |
| 102 | size += len(parts[0]) |
| 103 | parts = parts[1:] |
| 104 | } |
| 105 | if size > tokens { |
| 106 | lidx := len(bufs) - 1 |
| 107 | last := bufs[lidx] |
| 108 | take := len(last) - (size - tokens) |
| 109 | bufs[lidx] = last[:take] |
| 110 | left = last[take:] |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 111 | size = tokens |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 112 | } |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 113 | d := &message.Data{ |
| 114 | ID: f.id, |
| 115 | Payload: bufs, |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 116 | } |
| 117 | done := len(left) == 0 && len(parts) == 0 |
| 118 | if alsoClose && done { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 119 | d.Flags |= message.CloseFlag |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 120 | } |
Matt Rosencrantz | 0ec053b | 2015-09-03 14:20:02 -0700 | [diff] [blame] | 121 | if f.noEncrypt { |
| 122 | d.Flags |= message.DisableEncryptionFlag |
| 123 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 124 | sent += size |
Matt Rosencrantz | bcaabf2 | 2015-09-11 14:08:53 -0700 | [diff] [blame] | 125 | |
| 126 | var err error |
| 127 | if f.opened { |
| 128 | err = f.conn.mp.writeMsg(f.ctx, d) |
| 129 | } else { |
| 130 | err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{ |
| 131 | ID: f.id, |
| 132 | InitialCounters: defaultBufferSize, |
| 133 | BlessingsKey: f.bkey, |
| 134 | DischargeKey: f.dkey, |
| 135 | Flags: d.Flags, |
| 136 | Payload: d.Payload, |
| 137 | }) |
| 138 | f.opened = true |
| 139 | } |
| 140 | return size, done, err |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 141 | }) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 142 | if alsoClose || err != nil { |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 143 | f.close(f.ctx, err) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 144 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 145 | return sent, err |
| 146 | } |
| 147 | |
| 148 | // WriteMsg is like Write, but allows writing more than one buffer at a time. |
| 149 | // The data in each buffer is written sequentially onto the flow. Returns the |
| 150 | // number of bytes written. WriteMsg must return a non-nil error if it writes |
| 151 | // less than the total number of bytes from all buffers. |
| 152 | // Write, WriteMsg, and WriteMsgAndClose should not be called concurrently |
| 153 | // with themselves or each other. |
| 154 | func (f *flw) WriteMsg(parts ...[]byte) (int, error) { |
| 155 | return f.writeMsg(false, parts...) |
| 156 | } |
| 157 | |
| 158 | // WriteMsgAndClose performs WriteMsg and then closes the flow. |
| 159 | // Write, WriteMsg, and WriteMsgAndClose should not be called concurrently |
| 160 | // with themselves or each other. |
| 161 | func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) { |
| 162 | return f.writeMsg(true, parts...) |
| 163 | } |
| 164 | |
| 165 | // SetContext sets the context associated with the flow. Typically this is |
| 166 | // used to set state that is only available after the flow is connected, such |
| 167 | // as a more restricted flow timeout, or the language of the request. |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 168 | // Calling SetContext may invalidate values previously returned from Closed. |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 169 | // |
| 170 | // The flow.Manager associated with ctx must be the same flow.Manager that the |
| 171 | // flow was dialed or accepted from, otherwise an error is returned. |
| 172 | // TODO(mattr): enforce this restriction. |
| 173 | // |
| 174 | // TODO(mattr): update v23/flow documentation. |
| 175 | // SetContext may not be called concurrently with other methods. |
| 176 | func (f *flw) SetContext(ctx *context.T) error { |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 177 | if f.cancel != nil { |
| 178 | f.cancel() |
| 179 | } |
| 180 | f.ctx, f.cancel = context.WithCancel(ctx) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 181 | return nil |
| 182 | } |
| 183 | |
| 184 | // LocalBlessings returns the blessings presented by the local end of the flow |
| 185 | // during authentication. |
| 186 | func (f *flw) LocalBlessings() security.Blessings { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 187 | if f.dialed { |
| 188 | blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey) |
| 189 | if err != nil { |
| 190 | f.conn.Close(f.ctx, err) |
| 191 | } |
| 192 | return blessings |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 193 | } |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 194 | return f.conn.lBlessings |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 195 | } |
| 196 | |
| 197 | // RemoteBlessings returns the blessings presented by the remote end of the |
| 198 | // flow during authentication. |
| 199 | func (f *flw) RemoteBlessings() security.Blessings { |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 200 | if !f.dialed { |
| 201 | blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey) |
| 202 | if err != nil { |
| 203 | f.conn.Close(f.ctx, err) |
| 204 | } |
| 205 | return blessings |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 206 | } |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 207 | return f.conn.rBlessings |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 208 | } |
| 209 | |
| 210 | // LocalDischarges returns the discharges presented by the local end of the |
| 211 | // flow during authentication. |
| 212 | // |
| 213 | // Discharges are organized in a map keyed by the discharge-identifier. |
| 214 | func (f *flw) LocalDischarges() map[string]security.Discharge { |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 215 | var discharges map[string]security.Discharge |
| 216 | var err error |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 217 | if f.dialed { |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 218 | _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey) |
| 219 | } else { |
| 220 | discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.lBlessings) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 221 | } |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 222 | if err != nil { |
| 223 | f.conn.Close(f.ctx, err) |
| 224 | } |
| 225 | return discharges |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 226 | } |
| 227 | |
| 228 | // RemoteDischarges returns the discharges presented by the remote end of the |
| 229 | // flow during authentication. |
| 230 | // |
| 231 | // Discharges are organized in a map keyed by the discharge-identifier. |
| 232 | func (f *flw) RemoteDischarges() map[string]security.Discharge { |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 233 | var discharges map[string]security.Discharge |
| 234 | var err error |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 235 | if !f.dialed { |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 236 | _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey) |
| 237 | } else { |
| 238 | discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.rBlessings) |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 239 | } |
Matt Rosencrantz | 570163b | 2015-08-20 12:44:35 -0700 | [diff] [blame] | 240 | if err != nil { |
| 241 | f.conn.Close(f.ctx, err) |
| 242 | } |
| 243 | return discharges |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 244 | } |
| 245 | |
| 246 | // Conn returns the connection the flow is multiplexed on. |
Suharsh Sivakumar | d0aedf8 | 2015-08-27 13:14:19 -0700 | [diff] [blame] | 247 | func (f *flw) Conn() flow.ManagedConn { |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 248 | return f.conn |
| 249 | } |
| 250 | |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 251 | // Closed returns a channel that remains open until the flow has been closed remotely |
| 252 | // or the context attached to the flow has been canceled. |
| 253 | // |
| 254 | // Note that after the returned channel is closed starting new writes will result |
| 255 | // in an error, but reads of previously queued data are still possible. No |
| 256 | // new data will be queued. |
| 257 | // TODO(mattr): update v23/flow docs. |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 258 | func (f *flw) Closed() <-chan struct{} { |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 259 | return f.ctx.Done() |
| 260 | } |
| 261 | |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 262 | func (f *flw) close(ctx *context.T, err error) { |
| 263 | f.q.close(ctx) |
Matt Rosencrantz | 99ce374 | 2015-08-07 18:02:35 -0700 | [diff] [blame] | 264 | f.cancel() |
Matt Rosencrantz | b9af7a8 | 2015-08-17 17:41:29 -0700 | [diff] [blame] | 265 | if eid := verror.ErrorID(err); eid != ErrFlowClosedRemotely.ID && |
| 266 | eid != ErrConnectionClosed.ID { |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 267 | // We want to try to send this message even if ctx is already canceled. |
| 268 | ctx, cancel := context.WithRootCancel(ctx) |
| 269 | err := f.worker.Run(ctx, func(tokens int) (int, bool, error) { |
Matt Rosencrantz | 8e8f1dd | 2015-08-21 22:26:21 -0700 | [diff] [blame] | 270 | return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{ |
| 271 | ID: f.id, |
| 272 | Flags: message.CloseFlag, |
| 273 | }) |
Matt Rosencrantz | 79f68b4 | 2015-08-10 16:57:41 -0700 | [diff] [blame] | 274 | }) |
| 275 | if err != nil { |
| 276 | ctx.Errorf("Could not send close flow message: %v", err) |
| 277 | } |
| 278 | cancel() |
| 279 | } |
Matt Rosencrantz | 07712e1 | 2015-07-31 18:45:25 -0700 | [diff] [blame] | 280 | } |
Suharsh Sivakumar | 4932b8a | 2015-08-24 13:08:43 -0700 | [diff] [blame] | 281 | |
| 282 | // Close marks the flow as closed. After Close is called, new data cannot be |
| 283 | // written on the flow. Reads of already queued data are still possible. |
| 284 | func (f *flw) Close() error { |
| 285 | f.close(f.ctx, nil) |
| 286 | return nil |
| 287 | } |