blob: d05a56155882cc7606511659bf8a445ff03252f9 [file] [log] [blame]
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package xproxyd
6
7import (
Suharsh Sivakumar98143772015-09-10 11:40:53 -07008 "fmt"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -07009 "io"
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070010 "sync"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070011
12 "v.io/v23"
13 "v.io/v23/context"
14 "v.io/v23/flow"
Suharsh Sivakumar98143772015-09-10 11:40:53 -070015 "v.io/v23/flow/message"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070016 "v.io/v23/naming"
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070017)
18
19// TODO(suharshs): Make sure that we don't leak any goroutines.
20
21type proxy struct {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070022 m flow.Manager
23 mu sync.Mutex
24 proxyEndpoints []naming.Endpoint
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070025}
26
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070027func New(ctx *context.T) (*proxy, *context.T, error) {
28 ctx, mgr, err := v23.ExperimentalWithNewFlowManager(ctx)
29 if err != nil {
30 return nil, nil, err
31 }
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070032 p := &proxy{
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070033 m: mgr,
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070034 }
35 for _, addr := range v23.GetListenSpec(ctx).Addrs {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070036 if addr.Protocol == "v23" {
37 ep, err := v23.NewEndpoint(addr.Address)
38 if err != nil {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070039 return nil, nil, err
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070040 }
41 f, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
42 if err != nil {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070043 return nil, nil, err
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070044 }
45 // Send a byte telling the acceptor that we are a proxy.
Suharsh Sivakumar98143772015-09-10 11:40:53 -070046 if err := writeMessage(ctx, &message.MultiProxyRequest{}, f); err != nil {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070047 return nil, nil, err
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070048 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -070049 msg, err := readMessage(ctx, f)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070050 if err != nil {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070051 return nil, nil, err
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070052 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -070053 m, ok := msg.(*message.ProxyResponse)
54 if !ok {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070055 return nil, nil, NewErrUnexpectedMessage(ctx, fmt.Sprintf("%t", m))
Suharsh Sivakumar98143772015-09-10 11:40:53 -070056 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070057 p.mu.Lock()
Suharsh Sivakumar98143772015-09-10 11:40:53 -070058 p.proxyEndpoints = append(p.proxyEndpoints, m.Endpoints...)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070059 p.mu.Unlock()
60 } else if err := p.m.Listen(ctx, addr.Protocol, addr.Address); err != nil {
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070061 return nil, nil, err
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070062 }
63 }
64 go p.listenLoop(ctx)
Suharsh Sivakumare0feb272015-09-14 18:03:48 -070065 return p, ctx, nil
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070066}
67
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070068func (p *proxy) ListeningEndpoints() []naming.Endpoint {
69 return p.m.ListeningEndpoints()
70}
71
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070072func (p *proxy) listenLoop(ctx *context.T) {
73 for {
74 f, err := p.m.Accept(ctx)
75 if err != nil {
76 ctx.Infof("p.m.Accept failed: %v", err)
77 break
78 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -070079 msg, err := readMessage(ctx, f)
80 if err != nil {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070081 ctx.Errorf("reading type byte failed: %v", err)
82 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -070083 switch m := msg.(type) {
84 case *message.Setup:
85 err = p.startRouting(ctx, f, m)
86 case *message.MultiProxyRequest:
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070087 err = p.replyToProxy(ctx, f)
Suharsh Sivakumar98143772015-09-10 11:40:53 -070088 case *message.ProxyServerRequest:
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070089 err = p.replyToServer(ctx, f)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -070090 default:
91 continue
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -070092 }
93 if err != nil {
94 ctx.Errorf("failed to handle incoming connection: %v", err)
95 }
96 }
97}
98
Suharsh Sivakumar98143772015-09-10 11:40:53 -070099func (p *proxy) startRouting(ctx *context.T, f flow.Flow, m *message.Setup) error {
100 fout, err := p.dialNextHop(ctx, f, m)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700101 if err != nil {
102 return err
103 }
104 go p.forwardLoop(ctx, f, fout)
105 go p.forwardLoop(ctx, fout, f)
106 return nil
107}
108
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700109func (p *proxy) forwardLoop(ctx *context.T, fin, fout flow.Flow) {
110 for {
111 _, err := io.Copy(fin, fout)
112 if err == io.EOF {
113 return
114 } else if err != nil {
115 ctx.Errorf("f.Read failed: %v", err)
116 return
117 }
118 }
119}
120
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700121func (p *proxy) dialNextHop(ctx *context.T, f flow.Flow, m *message.Setup) (flow.Flow, error) {
122 var (
123 rid naming.RoutingID
124 ep naming.Endpoint
125 err error
126 )
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700127 if routes := m.PeerRemoteEndpoint.Routes(); len(routes) > 0 {
128 if err := rid.FromString(routes[0]); err != nil {
129 return nil, err
130 }
131 // Make an endpoint with the correct routingID to dial out. All other fields
132 // do not matter.
133 // TODO(suharshs): Make sure that the routingID from the route belongs to a
134 // connection that is stored in the manager's cache. (i.e. a Server has connected
135 // with the routingID before)
136 if ep, err = setEndpointRoutingID(m.PeerRemoteEndpoint, rid); err != nil {
137 return nil, err
138 }
139 // Remove the read route from the setup message endpoint.
140 if m.PeerRemoteEndpoint, err = setEndpointRoutes(m.PeerRemoteEndpoint, routes[1:]); err != nil {
141 return nil, err
142 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700143 } else {
144 ep = m.PeerRemoteEndpoint
145 }
146 fout, err := p.m.Dial(ctx, ep, proxyBlessingsForPeer{}.run)
Suharsh Sivakumara2e2b742015-08-26 15:54:40 -0700147 if err != nil {
148 return nil, err
149 }
150 // Write the setup message back onto the flow for the next hop to read.
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700151 return fout, writeMessage(ctx, m, fout)
Suharsh Sivakumara2e2b742015-08-26 15:54:40 -0700152}
153
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700154func (p *proxy) replyToServer(ctx *context.T, f flow.Flow) error {
155 rid := f.Conn().RemoteEndpoint().RoutingID()
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700156 eps, err := p.returnEndpoints(ctx, rid, "")
Suharsh Sivakumara2e2b742015-08-26 15:54:40 -0700157 if err != nil {
158 return err
159 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700160 return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700161}
162
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700163func (p *proxy) replyToProxy(ctx *context.T, f flow.Flow) error {
164 // Add the routing id of the incoming proxy to the routes. The routing id of the
165 // returned endpoint doesn't matter because it will eventually be replaced
166 // by a server's rid by some later proxy.
167 // TODO(suharshs): Use a local route instead of this global routingID.
168 rid := f.Conn().RemoteEndpoint().RoutingID()
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700169 eps, err := p.returnEndpoints(ctx, naming.NullRoutingID, rid.String())
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700170 if err != nil {
171 return err
172 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700173 return writeMessage(ctx, &message.ProxyResponse{Endpoints: eps}, f)
Suharsh Sivakumarfb4af952015-08-21 17:51:01 -0700174}
Suharsh Sivakumara2e2b742015-08-26 15:54:40 -0700175
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700176func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700177 p.mu.Lock()
178 eps := append(p.m.ListeningEndpoints(), p.proxyEndpoints...)
179 p.mu.Unlock()
180 if len(eps) == 0 {
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700181 return nil, NewErrNotListening(ctx)
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700182 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700183 for idx, ep := range eps {
184 var err error
185 if rid != naming.NullRoutingID {
186 ep, err = setEndpointRoutingID(ep, rid)
187 if err != nil {
188 return nil, err
189 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700190 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700191 if len(route) > 0 {
192 var cp []string
193 cp = append(cp, ep.Routes()...)
194 cp = append(cp, route)
195 ep, err = setEndpointRoutes(ep, cp)
196 if err != nil {
197 return nil, err
198 }
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700199 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700200 eps[idx] = ep
Suharsh Sivakumar6734a792015-09-04 12:39:15 -0700201 }
Suharsh Sivakumar98143772015-09-10 11:40:53 -0700202 return eps, nil
Suharsh Sivakumara2e2b742015-08-26 15:54:40 -0700203}