blob: dd6ffc656ddbaa0f81dd28071a46ffed66f33f0e [file] [log] [blame]
Matt Rosencrantz07712e12015-07-31 18:45:25 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package conn
6
7import (
Matt Rosencrantz75250d32015-08-19 08:13:05 -07008 "strconv"
9
Matt Rosencrantz07712e12015-07-31 18:45:25 -070010 "v.io/v23/context"
11 "v.io/v23/flow"
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070012 "v.io/v23/flow/message"
Matt Rosencrantz07712e12015-07-31 18:45:25 -070013 "v.io/v23/security"
Matt Rosencrantz79f68b42015-08-10 16:57:41 -070014 "v.io/v23/verror"
Matt Rosencrantz07712e12015-07-31 18:45:25 -070015 "v.io/x/ref/runtime/internal/flow/flowcontrol"
Matt Rosencrantz07712e12015-07-31 18:45:25 -070016)
17
18type flw struct {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070019 id uint64
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070020 dialed bool
21 ctx *context.T
22 cancel context.CancelFunc
23 conn *Conn
24 worker *flowcontrol.Worker
25 opened bool
26 q *readq
27 bkey, dkey uint64
Matt Rosencrantz0ec053b2015-09-03 14:20:02 -070028 noEncrypt bool
Matt Rosencrantz07712e12015-07-31 18:45:25 -070029}
30
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070031// Ensure that *flw implements flow.Flow.
Matt Rosencrantz07712e12015-07-31 18:45:25 -070032var _ flow.Flow = &flw{}
33
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070034func (c *Conn) newFlowLocked(ctx *context.T, id uint64, bkey, dkey uint64, dialed, preopen bool) *flw {
Matt Rosencrantz07712e12015-07-31 18:45:25 -070035 f := &flw{
36 id: id,
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070037 dialed: dialed,
Matt Rosencrantz07712e12015-07-31 18:45:25 -070038 conn: c,
Matt Rosencrantz75250d32015-08-19 08:13:05 -070039 worker: c.fc.NewWorker(strconv.FormatUint(uint64(id), 10), flowPriority),
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -070040 q: newReadQ(c, id),
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070041 bkey: bkey,
42 dkey: dkey,
43 opened: preopen,
Matt Rosencrantz07712e12015-07-31 18:45:25 -070044 }
Matt Rosencrantz99ce3742015-08-07 18:02:35 -070045 f.SetContext(ctx)
Matt Rosencrantz07712e12015-07-31 18:45:25 -070046 c.flows[id] = f
47 return f
48}
49
Matt Rosencrantz0ec053b2015-09-03 14:20:02 -070050// disableEncrytion should not be called concurrently with Write* methods.
51func (f *flw) disableEncryption() {
52 f.noEncrypt = false
53}
54
Matt Rosencrantz07712e12015-07-31 18:45:25 -070055// Implement io.Reader.
56// Read and ReadMsg should not be called concurrently with themselves
57// or each other.
58func (f *flw) Read(p []byte) (n int, err error) {
Suharsh Sivakumard9bffe82015-08-24 16:35:29 -070059 f.conn.markUsed()
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -070060 if n, err = f.q.read(f.ctx, p); err != nil {
Matt Rosencrantz79f68b42015-08-10 16:57:41 -070061 f.close(f.ctx, err)
62 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -070063 return
64}
65
66// ReadMsg is like read, but it reads bytes in chunks. Depending on the
67// implementation the batch boundaries might or might not be significant.
68// Read and ReadMsg should not be called concurrently with themselves
69// or each other.
70func (f *flw) ReadMsg() (buf []byte, err error) {
Suharsh Sivakumard9bffe82015-08-24 16:35:29 -070071 f.conn.markUsed()
Matt Rosencrantz99ce3742015-08-07 18:02:35 -070072 // TODO(mattr): Currently we only ever release counters when some flow
73 // reads. We may need to do it more or less often. Currently
74 // we'll send counters whenever a new flow is opened.
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -070075 if buf, err = f.q.get(f.ctx); err != nil {
Matt Rosencrantz79f68b42015-08-10 16:57:41 -070076 f.close(f.ctx, err)
77 }
Matt Rosencrantz99ce3742015-08-07 18:02:35 -070078 return
Matt Rosencrantz07712e12015-07-31 18:45:25 -070079}
80
81// Implement io.Writer.
82// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
83// with themselves or each other.
84func (f *flw) Write(p []byte) (n int, err error) {
85 return f.WriteMsg(p)
86}
87
88func (f *flw) writeMsg(alsoClose bool, parts ...[]byte) (int, error) {
Suharsh Sivakumard9bffe82015-08-24 16:35:29 -070089 f.conn.markUsed()
Matt Rosencrantz07712e12015-07-31 18:45:25 -070090 sent := 0
91 var left []byte
Matt Rosencrantz07712e12015-07-31 18:45:25 -070092 err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
Matt Rosencrantz07712e12015-07-31 18:45:25 -070093 size := 0
94 var bufs [][]byte
95 if len(left) > 0 {
96 size += len(left)
97 bufs = append(bufs, left)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -070098 left = nil
Matt Rosencrantz07712e12015-07-31 18:45:25 -070099 }
100 for size <= tokens && len(parts) > 0 {
101 bufs = append(bufs, parts[0])
102 size += len(parts[0])
103 parts = parts[1:]
104 }
105 if size > tokens {
106 lidx := len(bufs) - 1
107 last := bufs[lidx]
108 take := len(last) - (size - tokens)
109 bufs[lidx] = last[:take]
110 left = last[take:]
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700111 size = tokens
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700112 }
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700113 d := &message.Data{
114 ID: f.id,
115 Payload: bufs,
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700116 }
117 done := len(left) == 0 && len(parts) == 0
118 if alsoClose && done {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700119 d.Flags |= message.CloseFlag
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700120 }
Matt Rosencrantz0ec053b2015-09-03 14:20:02 -0700121 if f.noEncrypt {
122 d.Flags |= message.DisableEncryptionFlag
123 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700124 sent += size
Matt Rosencrantzbcaabf22015-09-11 14:08:53 -0700125
126 var err error
127 if f.opened {
128 err = f.conn.mp.writeMsg(f.ctx, d)
129 } else {
130 err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
131 ID: f.id,
132 InitialCounters: defaultBufferSize,
133 BlessingsKey: f.bkey,
134 DischargeKey: f.dkey,
135 Flags: d.Flags,
136 Payload: d.Payload,
137 })
138 f.opened = true
139 }
140 return size, done, err
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700141 })
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700142 if alsoClose || err != nil {
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700143 f.close(f.ctx, err)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700144 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700145 return sent, err
146}
147
148// WriteMsg is like Write, but allows writing more than one buffer at a time.
149// The data in each buffer is written sequentially onto the flow. Returns the
150// number of bytes written. WriteMsg must return a non-nil error if it writes
151// less than the total number of bytes from all buffers.
152// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
153// with themselves or each other.
154func (f *flw) WriteMsg(parts ...[]byte) (int, error) {
155 return f.writeMsg(false, parts...)
156}
157
158// WriteMsgAndClose performs WriteMsg and then closes the flow.
159// Write, WriteMsg, and WriteMsgAndClose should not be called concurrently
160// with themselves or each other.
161func (f *flw) WriteMsgAndClose(parts ...[]byte) (int, error) {
162 return f.writeMsg(true, parts...)
163}
164
165// SetContext sets the context associated with the flow. Typically this is
166// used to set state that is only available after the flow is connected, such
167// as a more restricted flow timeout, or the language of the request.
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700168// Calling SetContext may invalidate values previously returned from Closed.
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700169//
170// The flow.Manager associated with ctx must be the same flow.Manager that the
171// flow was dialed or accepted from, otherwise an error is returned.
172// TODO(mattr): enforce this restriction.
173//
174// TODO(mattr): update v23/flow documentation.
175// SetContext may not be called concurrently with other methods.
176func (f *flw) SetContext(ctx *context.T) error {
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700177 if f.cancel != nil {
178 f.cancel()
179 }
180 f.ctx, f.cancel = context.WithCancel(ctx)
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700181 return nil
182}
183
184// LocalBlessings returns the blessings presented by the local end of the flow
185// during authentication.
186func (f *flw) LocalBlessings() security.Blessings {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700187 if f.dialed {
188 blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
189 if err != nil {
190 f.conn.Close(f.ctx, err)
191 }
192 return blessings
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700193 }
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700194 return f.conn.lBlessings
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700195}
196
197// RemoteBlessings returns the blessings presented by the remote end of the
198// flow during authentication.
199func (f *flw) RemoteBlessings() security.Blessings {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700200 if !f.dialed {
201 blessings, _, err := f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
202 if err != nil {
203 f.conn.Close(f.ctx, err)
204 }
205 return blessings
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700206 }
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700207 return f.conn.rBlessings
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700208}
209
210// LocalDischarges returns the discharges presented by the local end of the
211// flow during authentication.
212//
213// Discharges are organized in a map keyed by the discharge-identifier.
214func (f *flw) LocalDischarges() map[string]security.Discharge {
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700215 var discharges map[string]security.Discharge
216 var err error
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700217 if f.dialed {
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700218 _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
219 } else {
220 discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.lBlessings)
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700221 }
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700222 if err != nil {
223 f.conn.Close(f.ctx, err)
224 }
225 return discharges
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700226}
227
228// RemoteDischarges returns the discharges presented by the remote end of the
229// flow during authentication.
230//
231// Discharges are organized in a map keyed by the discharge-identifier.
232func (f *flw) RemoteDischarges() map[string]security.Discharge {
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700233 var discharges map[string]security.Discharge
234 var err error
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700235 if !f.dialed {
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700236 _, discharges, err = f.conn.blessingsFlow.get(f.ctx, f.bkey, f.dkey)
237 } else {
238 discharges, err = f.conn.blessingsFlow.getLatestDischarges(f.ctx, f.conn.rBlessings)
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700239 }
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700240 if err != nil {
241 f.conn.Close(f.ctx, err)
242 }
243 return discharges
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700244}
245
246// Conn returns the connection the flow is multiplexed on.
Suharsh Sivakumard0aedf82015-08-27 13:14:19 -0700247func (f *flw) Conn() flow.ManagedConn {
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700248 return f.conn
249}
250
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700251// Closed returns a channel that remains open until the flow has been closed remotely
252// or the context attached to the flow has been canceled.
253//
254// Note that after the returned channel is closed starting new writes will result
255// in an error, but reads of previously queued data are still possible. No
256// new data will be queued.
257// TODO(mattr): update v23/flow docs.
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700258func (f *flw) Closed() <-chan struct{} {
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700259 return f.ctx.Done()
260}
261
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700262func (f *flw) close(ctx *context.T, err error) {
263 f.q.close(ctx)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700264 f.cancel()
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700265 if eid := verror.ErrorID(err); eid != ErrFlowClosedRemotely.ID &&
266 eid != ErrConnectionClosed.ID {
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700267 // We want to try to send this message even if ctx is already canceled.
268 ctx, cancel := context.WithRootCancel(ctx)
269 err := f.worker.Run(ctx, func(tokens int) (int, bool, error) {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700270 return 0, true, f.conn.mp.writeMsg(ctx, &message.Data{
271 ID: f.id,
272 Flags: message.CloseFlag,
273 })
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700274 })
275 if err != nil {
276 ctx.Errorf("Could not send close flow message: %v", err)
277 }
278 cancel()
279 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700280}
Suharsh Sivakumar4932b8a2015-08-24 13:08:43 -0700281
282// Close marks the flow as closed. After Close is called, new data cannot be
283// written on the flow. Reads of already queued data are still possible.
284func (f *flw) Close() error {
285 f.close(f.ctx, nil)
286 return nil
287}