| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package proxy |
| |
| import ( |
| "fmt" |
| "io" |
| |
| "v.io/x/ref/runtime/internal/lib/iobuf" |
| "v.io/x/ref/runtime/internal/rpc/stream" |
| "v.io/x/ref/runtime/internal/rpc/stream/crypto" |
| "v.io/x/ref/runtime/internal/rpc/stream/manager" |
| "v.io/x/ref/runtime/internal/rpc/stream/message" |
| |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/security" |
| "v.io/x/lib/vlog" |
| ) |
| |
| var nullCipher = crypto.NullControlCipher{} |
| |
| type proxy struct { |
| mgr stream.XManager |
| principal security.Principal |
| } |
| |
| func New(protocol, address string, rid naming.RoutingID, ctx *context.T, principal security.Principal) (*proxy, func(), naming.Endpoint, error) { |
| proxy := &proxy{ |
| mgr: manager.XInternalNew(ctx, rid), |
| principal: principal, |
| } |
| ln, ep, err := proxy.mgr.Listen(protocol, address, proxy.principal, proxy.principal.BlessingStore().Default()) |
| if err != nil { |
| return nil, nil, nil, err |
| } |
| go proxy.listenLoop(ln) |
| // TODO(suharshs): implement proxy close function. |
| return proxy, func() {}, ep, nil |
| } |
| |
| func (p *proxy) listenLoop(ln stream.XListener) { |
| for { |
| f, err := ln.Accept() |
| if err != nil { |
| vlog.Errorf("ln.Accept failed: %v", err) |
| } |
| go func() { |
| if err := p.processFlow(f); err != nil { |
| vlog.Errorf("processFlow failed: %v", err) |
| } |
| }() |
| } |
| } |
| |
| func (p *proxy) processFlow(f stream.XFlow) error { |
| r := iobuf.NewReader(iobuf.NewPool(0), f) |
| m, err := message.ReadFrom(r, nullCipher) |
| if err != nil { |
| return fmt.Errorf("failed to read message: %v", err) |
| } |
| switch msg := m.(type) { |
| case *message.SetupVC: |
| fout, err := p.mgr.Dial(msg.RemoteEndpoint, p.principal) |
| if err != nil { |
| return fmt.Errorf("p.mgr.Dial: %v", err) |
| } |
| // Forward the setupVC message. |
| if err := message.WriteTo(fout, msg, nullCipher); err != nil { |
| return fmt.Errorf("message.WriteTo(fout): %v", err) |
| } |
| // Disable encryption on the flows since all data sent after the setupVC message |
| // will be encryption between the ends. |
| f.DisableEncryption() |
| fout.DisableEncryption() |
| go p.forwardLoop(f, fout) |
| go p.forwardLoop(fout, f) |
| return nil |
| default: |
| return fmt.Errorf("unexpected message: %v", m) |
| } |
| } |
| |
| func (p *proxy) forwardLoop(fin, fout stream.XFlow) { |
| for { |
| _, err := io.Copy(fin, fout) |
| if err == io.EOF { |
| return |
| } else if err != nil { |
| vlog.Errorf("f.Read failed: %v", err) |
| return |
| } |
| } |
| } |