| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package manager |
| |
| import ( |
| "fmt" |
| "net" |
| "strings" |
| "sync" |
| "syscall" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/flow" |
| "v.io/v23/flow/message" |
| "v.io/v23/naming" |
| "v.io/v23/security" |
| "v.io/v23/verror" |
| |
| "v.io/x/lib/netconfig" |
| "v.io/x/lib/netstate" |
| "v.io/x/ref/lib/pubsub" |
| slib "v.io/x/ref/lib/security" |
| "v.io/x/ref/lib/stats" |
| iflow "v.io/x/ref/runtime/internal/flow" |
| "v.io/x/ref/runtime/internal/flow/conn" |
| "v.io/x/ref/runtime/internal/lib/upcqueue" |
| "v.io/x/ref/runtime/internal/rpc/version" |
| "v.io/x/ref/runtime/protocols/bidi" |
| ) |
| |
| const ( |
| reapCacheInterval = 5 * time.Minute |
| minCacheInterval = 10 * time.Millisecond // the minimum time we are willing to poll the cache for idle or closed connections. |
| handshakeTimeout = time.Minute |
| ) |
| |
| type manager struct { |
| rid naming.RoutingID |
| closed chan struct{} |
| cache *ConnCache |
| ls *listenState |
| ctx *context.T |
| acceptChannelTimeout time.Duration |
| idleExpiry time.Duration // time after which idle connections will be closed. |
| cacheTicker *time.Ticker |
| } |
| |
| type listenState struct { |
| q *upcqueue.T |
| listenLoops sync.WaitGroup |
| dhcpPublisher *pubsub.Publisher |
| serverAuthorizedPeers []security.BlessingPattern // empty list implies all peers are authorized to see the server's blessings. |
| |
| mu sync.Mutex |
| serverBlessings security.Blessings |
| serverNames []string |
| listeners map[flow.Listener]*endpointState |
| proxyEndpoints []naming.Endpoint |
| proxyErrors map[string]error |
| // TODO(suharshs): Look into making the struct{Protocol, Address string} into |
| // a named struct. This may involve changing v.io/v23/rpc.ListenAddrs. |
| listenErrors map[struct{ Protocol, Address string }]error |
| dirty chan struct{} |
| stopRoaming func() |
| proxyFlows map[string]flow.Flow // keyed by ep.String() |
| } |
| |
| type endpointState struct { |
| leps []naming.Endpoint // the list of currently active endpoints. |
| tmplEndpoint naming.Endpoint // endpoint used as a template for creating new endpoints from the network interfaces provided from roaming. |
| roaming bool |
| } |
| |
| // New creates a new flow manager. |
| func New( |
| ctx *context.T, |
| rid naming.RoutingID, |
| dhcpPublisher *pubsub.Publisher, |
| channelTimeout time.Duration, |
| idleExpiry time.Duration, |
| authorizedPeers []security.BlessingPattern) flow.Manager { |
| m := &manager{ |
| rid: rid, |
| closed: make(chan struct{}), |
| cache: NewConnCache(idleExpiry), |
| ctx: ctx, |
| acceptChannelTimeout: channelTimeout, |
| idleExpiry: idleExpiry, |
| } |
| var valid <-chan struct{} |
| if rid != naming.NullRoutingID { |
| m.ls = &listenState{ |
| q: upcqueue.New(), |
| listeners: make(map[flow.Listener]*endpointState), |
| dirty: make(chan struct{}), |
| dhcpPublisher: dhcpPublisher, |
| proxyFlows: make(map[string]flow.Flow), |
| proxyErrors: make(map[string]error), |
| listenErrors: make(map[struct{ Protocol, Address string }]error), |
| serverAuthorizedPeers: authorizedPeers, |
| } |
| p := v23.GetPrincipal(ctx) |
| m.ls.serverBlessings, valid = p.BlessingStore().Default() |
| m.ls.serverNames = security.BlessingNames(p, m.ls.serverBlessings) |
| } |
| // Pick a interval that is the minimum of the idleExpiry and the reapCacheInterval. |
| cacheInterval := reapCacheInterval |
| if idleExpiry > 0 && idleExpiry/2 < cacheInterval { |
| cacheInterval = idleExpiry / 2 |
| } |
| if cacheInterval < minCacheInterval { |
| cacheInterval = minCacheInterval |
| } |
| m.cacheTicker = time.NewTicker(cacheInterval) |
| |
| statsPrefix := naming.Join("rpc", "flow", rid.String()) |
| m.cache.ExportStats(naming.Join(statsPrefix, "conn-cache")) |
| go func() { |
| for { |
| select { |
| case <-valid: |
| m.ls.mu.Lock() |
| p := v23.GetPrincipal(ctx) |
| m.ls.serverBlessings, valid = p.BlessingStore().Default() |
| m.ls.serverNames = security.BlessingNames(p, m.ls.serverBlessings) |
| m.updateEndpointBlessingsLocked(m.ls.serverNames) |
| m.ls.mu.Unlock() |
| case <-ctx.Done(): |
| m.stopListening() |
| m.cache.Close(ctx) |
| stats.Delete(statsPrefix) |
| close(m.closed) |
| return |
| case <-m.cacheTicker.C: |
| // Periodically kill closed connections and remove expired connections, |
| // based on the idleExpiry passed to the NewConnCache constructor. |
| m.cache.KillConnections(ctx, 0) |
| } |
| } |
| }() |
| return m |
| } |
| |
| func (m *manager) stopListening() { |
| if m.ls == nil { |
| return |
| } |
| m.cacheTicker.Stop() |
| m.ls.mu.Lock() |
| listeners := m.ls.listeners |
| m.ls.listeners = nil |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = nil |
| } |
| stopRoaming := m.ls.stopRoaming |
| m.ls.stopRoaming = nil |
| for _, f := range m.ls.proxyFlows { |
| f.Close() |
| } |
| m.ls.mu.Unlock() |
| if stopRoaming != nil { |
| stopRoaming() |
| } |
| for ln := range listeners { |
| ln.Close() |
| } |
| m.ls.listenLoops.Wait() |
| } |
| |
| func (m *manager) StopListening(ctx *context.T) { |
| if m.ls == nil { |
| return |
| } |
| m.stopListening() |
| // Now no more connections can start. We should lame duck all the conns |
| // and wait for all of them to ack. |
| m.cache.EnterLameDuckMode(ctx) |
| // Now nobody should send any more flows, so close the queue. |
| m.ls.q.Close() |
| } |
| |
| // Listen causes the Manager to accept flows from the provided protocol and address. |
| // Listen may be called muliple times. |
| func (m *manager) Listen(ctx *context.T, protocol, address string) (<-chan struct{}, error) { |
| if m.ls == nil { |
| return nil, NewErrListeningWithNullRid(ctx) |
| } |
| |
| ln, lnErr := listen(ctx, protocol, address) |
| defer m.ls.mu.Unlock() |
| m.ls.mu.Lock() |
| if m.ls.listeners == nil { |
| if ln != nil { |
| ln.Close() |
| } |
| return nil, flow.NewErrBadState(ctx, NewErrManagerClosed(ctx)) |
| } |
| errKey := struct{ Protocol, Address string }{Protocol: protocol, Address: address} |
| if lnErr != nil { |
| m.ls.listenErrors[errKey] = lnErr |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| return nil, iflow.MaybeWrapError(flow.ErrNetwork, ctx, lnErr) |
| } |
| |
| local := naming.Endpoint{ |
| Protocol: protocol, |
| Address: ln.Addr().String(), |
| RoutingID: m.rid, |
| }.WithBlessingNames(m.ls.serverNames) |
| leps, roam, err := m.createEndpoints(ctx, local) |
| if err != nil { |
| m.ls.listenErrors[errKey] = err |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| return nil, iflow.MaybeWrapError(flow.ErrBadArg, ctx, err) |
| } |
| m.ls.listeners[ln] = &endpointState{ |
| leps: leps, |
| tmplEndpoint: local, |
| roaming: roam, |
| } |
| if m.ls.stopRoaming == nil && m.ls.dhcpPublisher != nil && roam { |
| ctx2, cancel := context.WithCancel(ctx) |
| ch := make(chan struct{}) |
| m.ls.stopRoaming = func() { |
| cancel() |
| <-ch |
| } |
| go m.monitorNetworkChanges(ctx2, ch) |
| } |
| |
| // The endpoints have changed on this successful listen so notify any watchers. |
| m.ls.listenErrors[errKey] = nil |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| |
| m.ls.listenLoops.Add(1) |
| acceptFailed := make(chan struct{}) |
| go m.lnAcceptLoop(ctx, ln, local, errKey, acceptFailed) |
| return acceptFailed, nil |
| } |
| |
| func (m *manager) monitorNetworkChanges(ctx *context.T, done chan<- struct{}) { |
| defer close(done) |
| change, err := netconfig.NotifyChange() |
| if err != nil { |
| ctx.Errorf("endpoints will not be updated if the network configuration changes, failed to monitor network changes: %v", err) |
| return |
| } |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case <-change: |
| netstate.InvalidateCache() |
| m.updateRoamingEndpoints(ctx) |
| if change, err = netconfig.NotifyChange(); err != nil { |
| ctx.Errorf("endpoints will not be updated if the network configuration changes, failed to monitor network changes: %v", err) |
| return |
| } |
| } |
| } |
| } |
| |
| func (m *manager) updateRoamingEndpoints(ctx *context.T) { |
| ctx.Infof("Network configuration may have changed, adjusting the addresses to listen on (routing id: %v)", m.rid) |
| changed := false |
| m.ls.mu.Lock() |
| defer m.ls.mu.Unlock() |
| for _, epState := range m.ls.listeners { |
| if !epState.roaming { |
| continue |
| } |
| newleps, _, err := m.createEndpoints(ctx, epState.tmplEndpoint) |
| if err != nil { |
| ctx.Infof("Unable to update roaming endpoints for template %v: %v", epState.tmplEndpoint, err) |
| continue |
| } |
| if len(newleps) != len(epState.leps) { |
| changed = true |
| } |
| if !changed { |
| newset := make(map[string]bool) |
| for _, lep := range newleps { |
| newset[lep.String()] = true |
| } |
| for _, lep := range epState.leps { |
| if !newset[lep.String()] { |
| changed = true |
| break |
| } |
| } |
| } |
| if changed { |
| ctx.Infof("Changing from %v to %v", epState.leps, newleps) |
| } |
| epState.leps = newleps |
| } |
| if changed { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| } |
| |
| func (m *manager) createEndpoints(ctx *context.T, lep naming.Endpoint) ([]naming.Endpoint, bool, error) { |
| iep := lep |
| if !strings.HasPrefix(iep.Protocol, "tcp") && |
| !strings.HasPrefix(iep.Protocol, "ws") { |
| // If not tcp, ws, or wsh, just return the endpoint we were given. |
| return []naming.Endpoint{iep}, false, nil |
| } |
| host, port, err := net.SplitHostPort(iep.Address) |
| if err != nil { |
| return nil, false, err |
| } |
| chooser := v23.GetListenSpec(ctx).AddressChooser |
| addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser) |
| if err != nil { |
| return nil, false, err |
| } |
| ieps := make([]naming.Endpoint, 0, len(addrs)) |
| for _, addr := range addrs { |
| n, err := naming.ParseEndpoint(lep.String()) |
| if err != nil { |
| return nil, false, err |
| } |
| n.Address = net.JoinHostPort(addr.String(), port) |
| ieps = append(ieps, n) |
| } |
| return ieps, unspecified, nil |
| } |
| |
| func (m *manager) updateEndpointBlessingsLocked(names []string) { |
| for _, eps := range m.ls.listeners { |
| eps.tmplEndpoint = eps.tmplEndpoint.WithBlessingNames(names) |
| for i := range eps.leps { |
| eps.leps[i] = eps.leps[i].WithBlessingNames(names) |
| } |
| } |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| } |
| |
| // ProxyListen causes the Manager to accept flows from the specified endpoint. |
| // The endpoint must correspond to a vanadium proxy. |
| // If error != nil, establishing a connection to the Proxy failed. |
| // Otherwise, if error == nil, the returned chan will block until the |
| // connection to the proxy endpoint fails. The caller may then choose to retry |
| // the connection. |
| // name is a identifier of the proxy. It can be used to access errors |
| // in ListenStatus.ProxyErrors. |
| func (m *manager) ProxyListen(ctx *context.T, name string, ep naming.Endpoint) (<-chan struct{}, error) { |
| if m.ls == nil { |
| return nil, NewErrListeningWithNullRid(ctx) |
| } |
| f, err := m.internalDial(ctx, ep, proxyAuthorizer{}, m.acceptChannelTimeout, true, false) |
| if err != nil { |
| return nil, err |
| } |
| k := ep.String() |
| m.ls.mu.Lock() |
| m.ls.proxyFlows[k] = f |
| serverNames := m.ls.serverNames |
| m.ls.mu.Unlock() |
| w, err := message.Append(ctx, &message.ProxyServerRequest{}, nil) |
| if err != nil { |
| m.updateProxyEndpoints(nil, name, err, k) |
| return nil, err |
| } |
| if _, err = f.WriteMsg(w); err != nil { |
| m.updateProxyEndpoints(nil, name, err, k) |
| return nil, err |
| } |
| // We connect to the proxy once before we loop. |
| if err := m.readAndUpdateProxyEndpoints(ctx, name, f, k, serverNames); err != nil { |
| return nil, err |
| } |
| // We do exponential backoff unless the proxy closes the flow cleanly, in which |
| // case we redial immediately. |
| done := make(chan struct{}) |
| go func() { |
| for { |
| // we keep reading updates until we encounter an error, usually because the |
| // flow has been closed. |
| if err := m.readAndUpdateProxyEndpoints(ctx, name, f, k, serverNames); err != nil { |
| ctx.VI(2).Info(err) |
| close(done) |
| return |
| } |
| } |
| }() |
| return done, nil |
| } |
| |
| func (m *manager) readAndUpdateProxyEndpoints(ctx *context.T, name string, f flow.Flow, flowKey string, serverNames []string) error { |
| eps, err := m.readProxyResponse(ctx, f) |
| if err == nil { |
| for i := range eps { |
| eps[i] = eps[i].WithBlessingNames(serverNames) |
| } |
| } |
| m.updateProxyEndpoints(eps, name, err, flowKey) |
| return err |
| } |
| |
| func (m *manager) updateProxyEndpoints(eps []naming.Endpoint, name string, err error, flowKey string) { |
| defer m.ls.mu.Unlock() |
| m.ls.mu.Lock() |
| if err != nil { |
| delete(m.ls.proxyFlows, flowKey) |
| } |
| origErrS, errS := "", "" |
| if err != nil { |
| errS = err.Error() |
| } |
| if origErr := m.ls.proxyErrors[name]; origErr != nil { |
| errS = origErr.Error() |
| } |
| if endpointsEqual(m.ls.proxyEndpoints, eps) && errS == origErrS { |
| return |
| } |
| m.ls.proxyEndpoints = eps |
| m.ls.proxyErrors[name] = err |
| // The proxy endpoints have changed so we need to notify any watchers to |
| // requery Status. |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| } |
| |
| func endpointsEqual(a, b []naming.Endpoint) bool { |
| if len(a) != len(b) { |
| return false |
| } |
| m := make(map[string]struct{}) |
| for _, ep := range a { |
| m[ep.String()] = struct{}{} |
| } |
| for _, ep := range b { |
| key := ep.String() |
| if _, ok := m[key]; !ok { |
| return false |
| } |
| delete(m, key) |
| } |
| return len(m) == 0 |
| } |
| |
| func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) { |
| b, err := f.ReadMsg() |
| if err != nil { |
| f.Close() |
| return nil, err |
| } |
| msg, err := message.Read(ctx, b) |
| if err != nil { |
| f.Close() |
| return nil, iflow.MaybeWrapError(flow.ErrBadArg, ctx, err) |
| } |
| switch m := msg.(type) { |
| case *message.ProxyResponse: |
| return m.Endpoints, nil |
| case *message.ProxyErrorResponse: |
| f.Close() |
| return nil, NewErrProxyResponse(ctx, m.Error) |
| default: |
| f.Close() |
| return nil, flow.NewErrBadArg(ctx, NewErrInvalidProxyResponse(ctx, fmt.Sprintf("%T", m))) |
| } |
| } |
| |
| type proxyAuthorizer struct{} |
| |
| func (proxyAuthorizer) AuthorizePeer( |
| ctx *context.T, |
| localEndpoint, remoteEndpoint naming.Endpoint, |
| remoteBlessings security.Blessings, |
| remoteDischarges map[string]security.Discharge, |
| ) ([]string, []security.RejectedBlessing, error) { |
| return nil, nil, nil |
| } |
| |
| func (a proxyAuthorizer) BlessingsForPeer(ctx *context.T, proxyBlessings []string) ( |
| security.Blessings, map[string]security.Discharge, error) { |
| blessings := v23.GetPrincipal(ctx).BlessingStore().ForPeer(proxyBlessings...) |
| discharges, _ := slib.PrepareDischarges(ctx, blessings, proxyBlessings, "", nil) |
| return blessings, discharges, nil |
| } |
| |
| func (m *manager) lnAcceptLoop(ctx *context.T, ln flow.Listener, local naming.Endpoint, |
| errKey struct{ Protocol, Address string }, acceptFailed chan struct{}) { |
| defer m.ls.listenLoops.Done() |
| defer func() { |
| close(acceptFailed) |
| delete(m.ls.listeners, ln) |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| }() |
| const killConnectionsRetryDelay = 5 * time.Millisecond |
| for { |
| flowConn, err := ln.Accept(ctx) |
| for tokill := 1; isTemporaryError(err); tokill *= 2 { |
| if isTooManyOpenFiles(err) { |
| if err := m.cache.KillConnections(ctx, tokill); err != nil { |
| ctx.VI(2).Infof("failed to kill connections: %v", err) |
| continue |
| } |
| } else { |
| tokill = 1 |
| } |
| time.Sleep(killConnectionsRetryDelay) |
| flowConn, err = ln.Accept(ctx) |
| } |
| if err != nil { |
| m.ls.mu.Lock() |
| closed := m.ls.listeners == nil |
| m.ls.mu.Unlock() |
| if !closed { |
| ctx.Errorf("ln.Accept on localEP %v failed: %v", local, err) |
| return |
| } |
| m.ls.mu.Lock() |
| m.ls.listenErrors[errKey] = err |
| if m.ls.dirty != nil { |
| close(m.ls.dirty) |
| m.ls.dirty = make(chan struct{}) |
| } |
| m.ls.mu.Unlock() |
| return |
| } |
| |
| m.ls.mu.Lock() |
| if m.ls.listeners == nil { |
| m.ls.mu.Unlock() |
| return |
| } |
| m.ls.listenLoops.Add(1) |
| m.ls.mu.Unlock() |
| fh := &flowHandler{m, make(chan struct{})} |
| go func() { |
| defer m.ls.listenLoops.Done() |
| c, err := conn.NewAccepted( |
| m.ctx, |
| m.ls.serverAuthorizedPeers, |
| flowConn, |
| local, |
| version.Supported, |
| handshakeTimeout, |
| m.acceptChannelTimeout, |
| fh) |
| if err != nil { |
| ctx.Errorf("failed to accept flow.Conn on localEP %v failed: %v", local, err) |
| flowConn.Close() |
| } else if err = m.cache.InsertWithRoutingID(c, false); err != nil { |
| ctx.Errorf("failed to cache conn %v: %v", c, err) |
| c.Close(ctx, err) |
| } |
| close(fh.cached) |
| }() |
| } |
| } |
| |
| type hybridHandler struct { |
| handler conn.FlowHandler |
| ready chan struct{} |
| } |
| |
| func (h *hybridHandler) HandleFlow(f flow.Flow) error { |
| <-h.ready |
| return h.handler.HandleFlow(f) |
| } |
| |
| func (m *manager) handlerReady(fh conn.FlowHandler, proxy bool) { |
| if fh != nil { |
| if h, ok := fh.(*hybridHandler); ok { |
| if proxy { |
| h.handler = &proxyFlowHandler{m: m} |
| } else { |
| h.handler = &flowHandler{m: m} |
| } |
| close(h.ready) |
| } |
| } |
| } |
| |
| func newHybridHandler(m *manager) *hybridHandler { |
| return &hybridHandler{ |
| ready: make(chan struct{}), |
| } |
| } |
| |
| type flowHandler struct { |
| m *manager |
| cached chan struct{} |
| } |
| |
| func (h *flowHandler) HandleFlow(f flow.Flow) error { |
| if h.cached != nil { |
| <-h.cached |
| } |
| return h.m.ls.q.Put(f) |
| } |
| |
| type proxyFlowHandler struct { |
| m *manager |
| } |
| |
| func (h *proxyFlowHandler) HandleFlow(f flow.Flow) error { |
| go func() { |
| fh := &flowHandler{h.m, make(chan struct{})} |
| h.m.ls.mu.Lock() |
| if h.m.ls.listeners == nil { |
| // If we've entered lame duck mode we want to reject new flows |
| // from the proxy. This should come out as a connection failure |
| // for the client, which will result in a retry. |
| h.m.ls.mu.Unlock() |
| f.Close() |
| return |
| } |
| h.m.ls.mu.Unlock() |
| c, err := conn.NewAccepted( |
| h.m.ctx, |
| h.m.ls.serverAuthorizedPeers, |
| f, |
| f.LocalEndpoint(), |
| version.Supported, |
| handshakeTimeout, |
| h.m.acceptChannelTimeout, |
| fh) |
| if err != nil { |
| h.m.ctx.Errorf("failed to create accepted conn: %v", err) |
| } else if err = h.m.cache.InsertWithRoutingID(c, false); err != nil { |
| h.m.ctx.Errorf("failed to create accepted conn: %v", err) |
| } |
| close(fh.cached) |
| }() |
| return nil |
| } |
| |
| // Status returns the current flow.ListenStatus of the manager. |
| func (m *manager) Status() flow.ListenStatus { |
| var status flow.ListenStatus |
| if m.ls == nil { |
| return status |
| } |
| m.ls.mu.Lock() |
| status.Endpoints = make([]naming.Endpoint, len(m.ls.proxyEndpoints)) |
| copy(status.Endpoints, m.ls.proxyEndpoints) |
| for _, epState := range m.ls.listeners { |
| for _, ep := range epState.leps { |
| status.Endpoints = append(status.Endpoints, ep) |
| } |
| } |
| status.ProxyErrors = make(map[string]error, len(m.ls.proxyErrors)) |
| for k, v := range m.ls.proxyErrors { |
| status.ProxyErrors[k] = v |
| } |
| status.ListenErrors = make(map[struct{ Protocol, Address string }]error, len(m.ls.listenErrors)) |
| for k, v := range m.ls.listenErrors { |
| status.ListenErrors[k] = v |
| } |
| status.Dirty = m.ls.dirty |
| m.ls.mu.Unlock() |
| if len(status.Endpoints) == 0 { |
| status.Endpoints = append(status.Endpoints, naming.Endpoint{Protocol: bidi.Name, RoutingID: m.rid}) |
| } |
| return status |
| } |
| |
| // Accept blocks until a new Flow has been initiated by a remote process. |
| // Flows are accepted from addresses that the Manager is listening on, |
| // including outgoing dialed connections. |
| // |
| // For example: |
| // err := m.Listen(ctx, "tcp", ":0") |
| // for { |
| // flow, err := m.Accept(ctx) |
| // // process flow |
| // } |
| // |
| // can be used to accept Flows initiated by remote processes. |
| func (m *manager) Accept(ctx *context.T) (flow.Flow, error) { |
| if m.ls == nil { |
| return nil, NewErrListeningWithNullRid(ctx) |
| } |
| item, err := m.ls.q.Get(ctx.Done()) |
| switch { |
| case err == upcqueue.ErrQueueIsClosed: |
| return nil, flow.NewErrNetwork(ctx, NewErrManagerClosed(ctx)) |
| case err != nil: |
| return nil, flow.NewErrNetwork(ctx, NewErrAcceptFailed(ctx, err)) |
| default: |
| return item.(flow.Flow), nil |
| } |
| } |
| |
| // Dial creates a Flow to the provided remote endpoint, using 'auth' to |
| // determine the blessings that will be sent to the remote end. |
| // |
| // If the manager has a non-null RoutingID, the Manager will re-use connections |
| // by Listening on Dialed connections for the lifetime of the Dialed connection. |
| // |
| // channelTimeout specifies the duration we are willing to wait before determining |
| // that connections managed by this Manager are unhealthy and should be |
| // closed. |
| func (m *manager) Dial(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) { |
| return m.internalDial(ctx, remote, auth, channelTimeout, false, false) |
| } |
| |
| // DialSideChannel behaves the same as Dial, except that the returned flow is |
| // not factored in when deciding the underlying connection's idleness, etc. |
| func (m *manager) DialSideChannel(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) { |
| return m.internalDial(ctx, remote, auth, channelTimeout, false, true) |
| } |
| |
| // DialCached creates a Flow to the provided remote endpoint using only cached |
| // connections from previous Listen or Dial calls. |
| // If no cached connection exists, an error will be returned. |
| // |
| // 'auth' is used to determine the blessings that will be sent to the remote end. |
| // |
| // channelTimeout specifies the duration we are willing to wait before determining |
| // that connections managed by this Manager are unhealthy and should be |
| // closed. |
| func (m *manager) DialCached(ctx *context.T, remote naming.Endpoint, auth flow.PeerAuthorizer, channelTimeout time.Duration) (flow.Flow, error) { |
| var ( |
| err error |
| cached cachedConn |
| names []string |
| rejected []security.RejectedBlessing |
| addr = remote.Addr() |
| unresNetwork, unresAddress = addr.Network(), addr.String() |
| protocol, _ = flow.RegisteredProtocol(unresNetwork) |
| ) |
| if rid := remote.RoutingID; rid != naming.NullRoutingID { |
| // In the case the endpoint has a RoutingID we only want to check the cache |
| // for the RoutingID because we want to guarantee that the connection we |
| // return is to the end server and not a connection to an intermediate proxy. |
| cached, names, rejected, err = m.cache.FindWithRoutingID(ctx, remote, auth) |
| } else { |
| cached, names, rejected, err = m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol) |
| if err == nil { |
| m.cache.Unreserve(unresNetwork, unresAddress) |
| } |
| } |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err) |
| } |
| if cached == nil { |
| return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, NewErrConnNotInCache(ctx, remote.String())) |
| } |
| c := cached.(*conn.Conn) |
| return dialFlow(ctx, c, remote, names, rejected, channelTimeout, auth, false) |
| } |
| |
| func (m *manager) internalDial( |
| ctx *context.T, |
| remote naming.Endpoint, |
| auth flow.PeerAuthorizer, |
| channelTimeout time.Duration, |
| proxy, sideChannel bool) (flow.Flow, error) { |
| // Fast path, look for the conn based on unresolved network, address, and routingId first. |
| var ( |
| c *conn.Conn |
| addr = remote.Addr() |
| unresNetwork, unresAddress = addr.Network(), addr.String() |
| protocol, _ = flow.RegisteredProtocol(unresNetwork) |
| ) |
| if m.ls != nil && len(m.ls.serverAuthorizedPeers) > 0 { |
| auth = &peerAuthorizer{auth, m.ls.serverAuthorizedPeers} |
| } |
| cached, names, rejected, err := m.cache.Find(ctx, remote, unresNetwork, unresAddress, auth, protocol) |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err) |
| } |
| if cached != nil { |
| c = cached.(*conn.Conn) |
| m.cache.Unreserve(unresNetwork, unresAddress) |
| } else { |
| // We didn't find the connection we want in the cache. Dial it. |
| defer m.cache.Unreserve(unresNetwork, unresAddress) |
| flowConn, err := dial(ctx, protocol, unresNetwork, unresAddress) |
| if err != nil { |
| switch err := err.(type) { |
| case *net.OpError: |
| if err, ok := err.Err.(net.Error); ok && err.Timeout() { |
| return nil, iflow.MaybeWrapError(verror.ErrTimeout, ctx, err) |
| } |
| } |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err) |
| } |
| var fh conn.FlowHandler |
| if m.ls != nil { |
| m.ls.mu.Lock() |
| if stoppedListening := m.ls.listeners == nil; !stoppedListening { |
| fh = newHybridHandler(m) |
| } |
| m.ls.mu.Unlock() |
| } |
| c, names, rejected, err = conn.NewDialed( |
| ctx, |
| flowConn, |
| localEndpoint(flowConn, m.rid), |
| remote, |
| version.Supported, |
| auth, |
| handshakeTimeout, |
| 0, |
| fh, |
| ) |
| if err != nil { |
| flowConn.Close() |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err) |
| } |
| isProxy := proxy || !c.MatchesRID(remote) |
| if err := m.cache.Insert(c, isProxy); err != nil { |
| c.Close(ctx, err) |
| return nil, flow.NewErrBadState(ctx, err) |
| } |
| // Now that c is in the cache we can explicitly unreserve. |
| m.cache.Unreserve(unresNetwork, unresAddress) |
| m.handlerReady(fh, proxy) |
| } |
| |
| // If the connection we found or dialed turns out to not be the person we expected, assume it is a Proxy. |
| // We now need to make a flow to a proxy and upgrade it to a conn to the final server. |
| if !c.MatchesRID(remote) { |
| proxyConn := c |
| f, err := proxyConn.Dial(ctx, security.Blessings{}, nil, remote, channelTimeout, false) |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err) |
| } |
| var fh conn.FlowHandler |
| if m.ls != nil { |
| m.ls.mu.Lock() |
| if stoppedListening := m.ls.listeners == nil; !stoppedListening { |
| fh = &flowHandler{m: m} |
| } |
| m.ls.mu.Unlock() |
| } |
| c, names, rejected, err = conn.NewDialed( |
| ctx, |
| f, |
| proxyConn.LocalEndpoint(), |
| remote, |
| version.Supported, |
| auth, |
| handshakeTimeout, |
| 0, |
| fh, |
| ) |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err) |
| } |
| if err := m.cache.InsertWithRoutingID(c, false); err != nil { |
| c.Close(ctx, err) |
| return nil, iflow.MaybeWrapError(flow.ErrBadState, ctx, err) |
| } |
| } |
| |
| return dialFlow(ctx, c, remote, names, rejected, channelTimeout, auth, sideChannel) |
| } |
| |
| func dialFlow(ctx *context.T, c *conn.Conn, remote naming.Endpoint, names []string, rejected []security.RejectedBlessing, |
| channelTimeout time.Duration, auth flow.PeerAuthorizer, sideChannel bool) (flow.Flow, error) { |
| // Find the proper blessings and dial the final flow. |
| blessings, discharges, err := auth.BlessingsForPeer(ctx, names) |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, NewErrNoBlessingsForPeer(ctx, names, rejected, err)) |
| } |
| f, err := c.Dial(ctx, blessings, discharges, remote, channelTimeout, sideChannel) |
| if err != nil { |
| return nil, iflow.MaybeWrapError(flow.ErrDialFailed, ctx, err) |
| } |
| return f, nil |
| } |
| |
| // RoutingID returns the naming.Routing of the flow.Manager. |
| func (m *manager) RoutingID() naming.RoutingID { |
| return m.rid |
| } |
| |
| // Closed returns a channel that remains open for the lifetime of the Manager |
| // object. Once the channel is closed any operations on the Manager will |
| // necessarily fail. |
| func (m *manager) Closed() <-chan struct{} { |
| return m.closed |
| } |
| |
| func dial(ctx *context.T, p flow.Protocol, protocol, address string) (flow.Conn, error) { |
| if p != nil { |
| var timeout time.Duration |
| if dl, ok := ctx.Deadline(); ok { |
| timeout = dl.Sub(time.Now()) |
| } |
| type connAndErr struct { |
| c flow.Conn |
| e error |
| } |
| ch := make(chan connAndErr, 1) |
| go func() { |
| conn, err := p.Dial(ctx, protocol, address, timeout) |
| ch <- connAndErr{conn, err} |
| }() |
| select { |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| case cae := <-ch: |
| return cae.c, cae.e |
| } |
| } |
| return nil, NewErrUnknownProtocol(ctx, protocol) |
| } |
| |
| func listen(ctx *context.T, protocol, address string) (flow.Listener, error) { |
| if p, _ := flow.RegisteredProtocol(protocol); p != nil { |
| ln, err := p.Listen(ctx, protocol, address) |
| if err != nil { |
| return nil, err |
| } |
| return ln, nil |
| } |
| return nil, NewErrUnknownProtocol(ctx, protocol) |
| } |
| |
| func localEndpoint(conn flow.Conn, rid naming.RoutingID) naming.Endpoint { |
| localAddr := conn.LocalAddr() |
| ep := naming.Endpoint{ |
| Protocol: localAddr.Network(), |
| Address: localAddr.String(), |
| RoutingID: rid, |
| } |
| return ep |
| } |
| |
| func isTemporaryError(err error) bool { |
| oErr, ok := err.(*net.OpError) |
| return ok && oErr.Temporary() |
| } |
| |
| func isTooManyOpenFiles(err error) bool { |
| oErr, ok := err.(*net.OpError) |
| return ok && strings.Contains(oErr.Err.Error(), syscall.EMFILE.Error()) |
| } |
| |
| // peerAuthorizer implements flow.PeerAuthorizer. It is meant to be used |
| // when a server operating in private mode (i.e., with a non-empty set |
| // of authorized peers) acts as a client. It wraps around the PeerAuthorizer |
| // specified by the call opts and addiitonally ensures that any peers that |
| // the client communicates with belong to the set of authorized peers. |
| type peerAuthorizer struct { |
| auth flow.PeerAuthorizer |
| authorizedPeers []security.BlessingPattern |
| } |
| |
| func (x *peerAuthorizer) AuthorizePeer( |
| ctx *context.T, |
| localEP, remoteEP naming.Endpoint, |
| remoteBlessings security.Blessings, |
| remoteDischarges map[string]security.Discharge) ([]string, []security.RejectedBlessing, error) { |
| localPrincipal := v23.GetPrincipal(ctx) |
| // The "Method" and "Suffix" fields of the call are not populated |
| // as they are considered irrelevant for authorizing server blessings. |
| call := security.NewCall(&security.CallParams{ |
| Timestamp: time.Now(), |
| LocalPrincipal: localPrincipal, |
| LocalEndpoint: localEP, |
| RemoteBlessings: remoteBlessings, |
| RemoteDischarges: remoteDischarges, |
| RemoteEndpoint: remoteEP, |
| }) |
| |
| peerNames, rejectedPeerNames := security.RemoteBlessingNames(ctx, call) |
| for _, p := range x.authorizedPeers { |
| if p.MatchedBy(peerNames...) { |
| return x.auth.AuthorizePeer(ctx, localEP, remoteEP, remoteBlessings, remoteDischarges) |
| } |
| } |
| return nil, nil, fmt.Errorf("peer names: %v (rejected: %v) do not match one of the authorized patterns: %v", peerNames, rejectedPeerNames, x.authorizedPeers) |
| } |
| |
| func (x *peerAuthorizer) BlessingsForPeer(ctx *context.T, peerNames []string) ( |
| security.Blessings, map[string]security.Discharge, error) { |
| return x.auth.BlessingsForPeer(ctx, peerNames) |
| } |