blob: ecbd8a9e9cede4a4c57678f68688a6d0b36ca39c [file] [log] [blame]
Matt Rosencrantz07625912015-07-21 10:18:00 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package conn
6
7import (
Matt Rosencrantz99ce3742015-08-07 18:02:35 -07008 "reflect"
Matt Rosencrantz07712e12015-07-31 18:45:25 -07009 "sync"
Matt Rosencrantz570163b2015-08-20 12:44:35 -070010 "time"
Matt Rosencrantz07712e12015-07-31 18:45:25 -070011
12 "v.io/v23"
Matt Rosencrantz07625912015-07-21 10:18:00 -070013 "v.io/v23/context"
14 "v.io/v23/flow"
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070015 "v.io/v23/flow/message"
Matt Rosencrantz07625912015-07-21 10:18:00 -070016 "v.io/v23/naming"
17 "v.io/v23/rpc/version"
18 "v.io/v23/security"
Matt Rosencrantz79f68b42015-08-10 16:57:41 -070019 "v.io/v23/verror"
Matt Rosencrantz07712e12015-07-31 18:45:25 -070020 "v.io/x/ref/runtime/internal/flow/flowcontrol"
Matt Rosencrantz07625912015-07-21 10:18:00 -070021)
22
Matt Rosencrantz05169752015-07-31 11:08:28 -070023// flowID is a number assigned to identify a flow.
24// Each flow on a given conn will have a unique number.
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070025const (
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070026 invalidFlowID = iota
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070027 blessingsFlowID
28 reservedFlows = 10
29)
30
Matt Rosencrantz07712e12015-07-31 18:45:25 -070031const mtu = 1 << 16
32const defaultBufferSize = 1 << 20
Matt Rosencrantz07712e12015-07-31 18:45:25 -070033
34const (
35 expressPriority = iota
36 flowPriority
37 tearDownPriority
38)
39
Matt Rosencrantz07625912015-07-21 10:18:00 -070040// FlowHandlers process accepted flows.
41type FlowHandler interface {
42 // HandleFlow processes an accepted flow.
43 HandleFlow(flow.Flow) error
44}
45
46// Conns are a multiplexing encrypted channels that can host Flows.
47type Conn struct {
Matt Rosencrantz570163b2015-08-20 12:44:35 -070048 fc *flowcontrol.FlowController
49 mp *messagePipe
50 handler FlowHandler
51 version version.RPCVersion
52 lBlessings, rBlessings security.Blessings
53 local, remote naming.Endpoint
54 closed chan struct{}
55 blessingsFlow *blessingsFlow
56 loopWG sync.WaitGroup
Matt Rosencrantz07712e12015-07-31 18:45:25 -070057
Matt Rosencrantz570163b2015-08-20 12:44:35 -070058 mu sync.Mutex
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -070059 nextFid uint64
60 flows map[uint64]*flw
Matt Rosencrantz570163b2015-08-20 12:44:35 -070061 dischargeTimer *time.Timer
Suharsh Sivakumar79b31792015-08-24 15:39:15 -070062 lastUsedTime time.Time
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -070063 toRelease map[uint64]uint64
64 borrowing map[uint64]bool
Matt Rosencrantz07625912015-07-21 10:18:00 -070065}
66
Suharsh Sivakumard0aedf82015-08-27 13:14:19 -070067// Ensure that *Conn implements flow.ManagedConn.
68var _ flow.ManagedConn = &Conn{}
Matt Rosencrantz07625912015-07-21 10:18:00 -070069
70// NewDialed dials a new Conn on the given conn.
Matt Rosencrantz07625912015-07-21 10:18:00 -070071func NewDialed(
72 ctx *context.T,
Suharsh Sivakumar4932b8a2015-08-24 13:08:43 -070073 conn flow.MsgReadWriteCloser,
Matt Rosencrantz07625912015-07-21 10:18:00 -070074 local, remote naming.Endpoint,
75 versions version.RPCVersionRange,
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070076 handler FlowHandler) (*Conn, error) {
Matt Rosencrantz07712e12015-07-31 18:45:25 -070077 c := &Conn{
Suharsh Sivakumar79b31792015-08-24 15:39:15 -070078 fc: flowcontrol.New(defaultBufferSize, mtu),
79 mp: newMessagePipe(conn),
80 handler: handler,
81 lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
82 local: local,
83 remote: remote,
84 closed: make(chan struct{}),
85 nextFid: reservedFlows,
86 flows: map[uint64]*flw{},
87 lastUsedTime: time.Now(),
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -070088 toRelease: map[uint64]uint64{},
89 borrowing: map[uint64]bool{},
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -070090 }
91 if err := c.dialHandshake(ctx, versions); err != nil {
92 c.Close(ctx, err)
93 return nil, err
Matt Rosencrantz07712e12015-07-31 18:45:25 -070094 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -070095 c.loopWG.Add(1)
Matt Rosencrantz07712e12015-07-31 18:45:25 -070096 go c.readLoop(ctx)
97 return c, nil
Matt Rosencrantz07625912015-07-21 10:18:00 -070098}
99
100// NewAccepted accepts a new Conn on the given conn.
101func NewAccepted(
102 ctx *context.T,
Suharsh Sivakumar4932b8a2015-08-24 13:08:43 -0700103 conn flow.MsgReadWriteCloser,
Matt Rosencrantz07625912015-07-21 10:18:00 -0700104 local naming.Endpoint,
Matt Rosencrantz07625912015-07-21 10:18:00 -0700105 versions version.RPCVersionRange,
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700106 handler FlowHandler) (*Conn, error) {
107 c := &Conn{
Suharsh Sivakumar79b31792015-08-24 15:39:15 -0700108 fc: flowcontrol.New(defaultBufferSize, mtu),
109 mp: newMessagePipe(conn),
110 handler: handler,
111 lBlessings: v23.GetPrincipal(ctx).BlessingStore().Default(),
112 local: local,
113 closed: make(chan struct{}),
114 nextFid: reservedFlows + 1,
115 flows: map[uint64]*flw{},
116 lastUsedTime: time.Now(),
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700117 toRelease: map[uint64]uint64{},
118 borrowing: map[uint64]bool{},
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700119 }
120 if err := c.acceptHandshake(ctx, versions); err != nil {
121 c.Close(ctx, err)
122 return nil, err
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700123 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700124 c.loopWG.Add(1)
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700125 go c.readLoop(ctx)
126 return c, nil
Matt Rosencrantz07625912015-07-21 10:18:00 -0700127}
128
129// Dial dials a new flow on the Conn.
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700130func (c *Conn) Dial(ctx *context.T, fn flow.BlessingsForPeer) (flow.Flow, error) {
131 if c.rBlessings.IsZero() {
132 return nil, NewErrDialingNonServer(ctx)
133 }
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700134 rDischarges, err := c.blessingsFlow.getLatestDischarges(ctx, c.rBlessings)
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700135 if err != nil {
136 return nil, err
137 }
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700138 blessings, discharges, err := fn(ctx, c.local, c.remote, c.rBlessings, rDischarges)
139 if err != nil {
140 return nil, err
141 }
142 bkey, dkey, err := c.blessingsFlow.put(ctx, blessings, discharges)
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700143 if err != nil {
144 return nil, err
145 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700146 defer c.mu.Unlock()
147 c.mu.Lock()
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700148 if c.flows == nil {
149 return nil, NewErrConnectionClosed(ctx)
150 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700151 id := c.nextFid
152 c.nextFid++
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700153 return c.newFlowLocked(ctx, id, bkey, dkey, true, false), nil
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700154}
Matt Rosencrantz07625912015-07-21 10:18:00 -0700155
Matt Rosencrantz07625912015-07-21 10:18:00 -0700156// LocalEndpoint returns the local vanadium Endpoint
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700157func (c *Conn) LocalEndpoint() naming.Endpoint { return c.local }
Matt Rosencrantz07625912015-07-21 10:18:00 -0700158
159// RemoteEndpoint returns the remote vanadium Endpoint
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700160func (c *Conn) RemoteEndpoint() naming.Endpoint { return c.remote }
Matt Rosencrantz07625912015-07-21 10:18:00 -0700161
Suharsh Sivakumare0feb272015-09-14 18:03:48 -0700162// LocalBlessings returns the local blessings.
163func (c *Conn) LocalBlessings() security.Blessings { return c.lBlessings }
164
165// RemoteBlessings returns the remote blessings.
166func (c *Conn) RemoteBlessings() security.Blessings { return c.rBlessings }
167
Suharsh Sivakumarc0038c02015-09-09 12:50:06 -0700168// CommonVersion returns the RPCVersion negotiated between the local and remote endpoints.
169func (c *Conn) CommonVersion() version.RPCVersion { return c.version }
170
Suharsh Sivakumar79b31792015-08-24 15:39:15 -0700171// LastUsedTime returns the time at which the Conn had bytes read or written on it.
172func (c *Conn) LastUsedTime() time.Time {
173 defer c.mu.Unlock()
174 c.mu.Lock()
175 return c.lastUsedTime
176}
177
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700178// Closed returns a channel that will be closed after the Conn is shutdown.
179// After this channel is closed it is guaranteed that all Dial calls will fail
180// with an error and no more flows will be sent to the FlowHandler.
181func (c *Conn) Closed() <-chan struct{} { return c.closed }
182
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700183// Close shuts down a conn.
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700184func (c *Conn) Close(ctx *context.T, err error) {
185 c.mu.Lock()
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700186 var flows map[uint64]*flw
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700187 flows, c.flows = c.flows, nil
Matt Rosencrantz570163b2015-08-20 12:44:35 -0700188 if c.dischargeTimer != nil {
189 c.dischargeTimer.Stop()
190 c.dischargeTimer = nil
191 }
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700192 c.mu.Unlock()
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700193
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700194 if flows == nil {
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700195 // This conn is already being torn down.
196 <-c.closed
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700197 return
198 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700199 c.internalClose(ctx, err, flows)
200}
201
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700202func (c *Conn) internalClose(ctx *context.T, err error, flows map[uint64]*flw) {
203 ctx.VI(2).Infof("Closing connection: %v", err)
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700204 if verror.ErrorID(err) != ErrConnClosedRemotely.ID {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700205 msg := ""
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700206 if err != nil {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700207 msg = err.Error()
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700208 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700209 cerr := c.fc.Run(ctx, "close", expressPriority, func(_ int) (int, bool, error) {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700210 return 0, true, c.mp.writeMsg(ctx, &message.TearDown{Message: msg})
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700211 })
212 if cerr != nil {
213 ctx.Errorf("Error sending tearDown on connection to %s: %v", c.remote, cerr)
214 }
215 }
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700216 for _, f := range flows {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700217 f.close(ctx, NewErrConnectionClosed(ctx))
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700218 }
Matt Rosencrantz79f68b42015-08-10 16:57:41 -0700219 if cerr := c.mp.close(); cerr != nil {
220 ctx.Errorf("Error closing underlying connection for %s: %v", c.remote, cerr)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700221 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700222 c.loopWG.Wait()
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700223 close(c.closed)
224}
225
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700226func (c *Conn) release(ctx *context.T, fid, count uint64) {
227 var toRelease map[uint64]uint64
228 var release bool
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700229 c.mu.Lock()
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700230 c.toRelease[fid] += count
231 if c.borrowing[fid] {
232 c.toRelease[invalidFlowID] += count
233 release = c.toRelease[invalidFlowID] > defaultBufferSize/2
234 } else {
235 release = c.toRelease[fid] > defaultBufferSize/2
236 }
237 if release {
238 toRelease = c.toRelease
239 c.toRelease = make(map[uint64]uint64, len(c.toRelease))
240 c.borrowing = make(map[uint64]bool, len(c.borrowing))
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700241 }
242 c.mu.Unlock()
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700243
244 if toRelease == nil {
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700245 return
246 }
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700247 delete(toRelease, invalidFlowID)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700248
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700249 err := c.fc.Run(ctx, "release", expressPriority, func(_ int) (int, bool, error) {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700250 err := c.mp.writeMsg(ctx, &message.Release{
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700251 Counters: toRelease,
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700252 })
253 return 0, true, err
254 })
255 if err != nil {
Matt Rosencrantzb0c97f32015-08-08 08:34:41 -0700256 c.Close(ctx, NewErrSend(ctx, "release", c.remote.String(), err))
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700257 }
258}
259
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700260func (c *Conn) handleMessage(ctx *context.T, m message.Message) error {
261 switch msg := m.(type) {
262 case *message.TearDown:
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700263 return NewErrConnClosedRemotely(ctx, msg.Message)
Matt Rosencrantz99ce3742015-08-07 18:02:35 -0700264
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700265 case *message.OpenFlow:
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700266 if c.handler == nil {
267 return NewErrUnexpectedMsg(ctx, "openFlow")
268 }
269 c.mu.Lock()
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700270 f := c.newFlowLocked(ctx, msg.ID, msg.BlessingsKey, msg.DischargeKey, false, true)
Matt Rosencrantz8dc67f22015-09-09 18:26:50 -0700271 f.worker.Release(ctx, int(msg.InitialCounters))
272 c.toRelease[msg.ID] = defaultBufferSize
273 c.borrowing[msg.ID] = true
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700274 c.mu.Unlock()
275 c.handler.HandleFlow(f)
Matt Rosencrantzbcaabf22015-09-11 14:08:53 -0700276 if err := f.q.put(ctx, msg.Payload); err != nil {
277 return err
278 }
279 if msg.Flags&message.CloseFlag != 0 {
280 f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
281 }
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700282
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700283 case *message.Release:
284 release := make([]flowcontrol.Release, 0, len(msg.Counters))
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700285 c.mu.Lock()
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700286 for fid, val := range msg.Counters {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700287 if f := c.flows[fid]; f != nil {
288 release = append(release, flowcontrol.Release{
289 Worker: f.worker,
290 Tokens: int(val),
291 })
292 }
293 }
294 c.mu.Unlock()
295 if err := c.fc.Release(ctx, release); err != nil {
296 return err
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700297 }
298
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700299 case *message.Data:
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700300 c.mu.Lock()
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700301 f := c.flows[msg.ID]
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700302 c.mu.Unlock()
303 if f == nil {
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700304 ctx.Infof("Ignoring data message for unknown flow on connection to %s: %d", c.remote, msg.ID)
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700305 return nil
306 }
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700307 if err := f.q.put(ctx, msg.Payload); err != nil {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700308 return err
309 }
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700310 if msg.Flags&message.CloseFlag != 0 {
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700311 f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
312 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700313
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700314 default:
315 return NewErrUnexpectedMsg(ctx, reflect.TypeOf(msg).String())
316 }
317 return nil
318}
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700319
Matt Rosencrantzb9af7a82015-08-17 17:41:29 -0700320func (c *Conn) readLoop(ctx *context.T) {
321 var err error
322 for {
323 msg, rerr := c.mp.readMsg(ctx)
324 if rerr != nil {
325 err = NewErrRecv(ctx, c.remote.String(), rerr)
326 break
327 }
328 if err = c.handleMessage(ctx, msg); err != nil {
329 break
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700330 }
331 }
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700332
333 c.mu.Lock()
Matt Rosencrantz8e8f1dd2015-08-21 22:26:21 -0700334 var flows map[uint64]*flw
Matt Rosencrantz75250d32015-08-19 08:13:05 -0700335 flows, c.flows = c.flows, nil
336 c.mu.Unlock()
337
338 c.loopWG.Done()
339 if flows != nil {
340 c.internalClose(ctx, err, flows)
341 }
Matt Rosencrantz07712e12015-07-31 18:45:25 -0700342}
Suharsh Sivakumard9bffe82015-08-24 16:35:29 -0700343
344func (c *Conn) markUsed() {
345 c.mu.Lock()
346 c.lastUsedTime = time.Now()
347 c.mu.Unlock()
348}
Suharsh Sivakumar2ebd6cf2015-09-22 16:37:31 -0700349
350func (c *Conn) IsEncapsulated() bool {
351 _, ok := c.mp.rw.(*flw)
352 return ok
353}