blob: 83283111482e18d0361c356bd15189160eba05af [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proxy
import (
"fmt"
"io"
"v.io/x/ref/runtime/internal/lib/iobuf"
"v.io/x/ref/runtime/internal/rpc/stream"
"v.io/x/ref/runtime/internal/rpc/stream/crypto"
"v.io/x/ref/runtime/internal/rpc/stream/manager"
"v.io/x/ref/runtime/internal/rpc/stream/message"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security"
"v.io/x/lib/vlog"
)
var nullCipher = crypto.NullControlCipher{}
type proxy struct {
mgr stream.XManager
principal security.Principal
}
func New(protocol, address string, rid naming.RoutingID, ctx *context.T, principal security.Principal) (*proxy, func(), naming.Endpoint, error) {
proxy := &proxy{
mgr: manager.XInternalNew(ctx, rid),
principal: principal,
}
ln, ep, err := proxy.mgr.Listen(protocol, address, proxy.principal, proxy.principal.BlessingStore().Default())
if err != nil {
return nil, nil, nil, err
}
go proxy.listenLoop(ln)
// TODO(suharshs): implement proxy close function.
return proxy, func() {}, ep, nil
}
func (p *proxy) listenLoop(ln stream.XListener) {
for {
f, err := ln.Accept()
if err != nil {
vlog.Errorf("ln.Accept failed: %v", err)
}
go func() {
if err := p.processFlow(f); err != nil {
vlog.Errorf("processFlow failed: %v", err)
}
}()
}
}
func (p *proxy) processFlow(f stream.XFlow) error {
r := iobuf.NewReader(iobuf.NewPool(0), f)
m, err := message.ReadFrom(r, nullCipher)
if err != nil {
return fmt.Errorf("failed to read message: %v", err)
}
switch msg := m.(type) {
case *message.SetupVC:
fout, err := p.mgr.Dial(msg.RemoteEndpoint, p.principal)
if err != nil {
return fmt.Errorf("p.mgr.Dial: %v", err)
}
// Forward the setupVC message.
if err := message.WriteTo(fout, msg, nullCipher); err != nil {
return fmt.Errorf("message.WriteTo(fout): %v", err)
}
// Disable encryption on the flows since all data sent after the setupVC message
// will be encryption between the ends.
f.DisableEncryption()
fout.DisableEncryption()
go p.forwardLoop(f, fout)
go p.forwardLoop(fout, f)
return nil
default:
return fmt.Errorf("unexpected message: %v", m)
}
}
func (p *proxy) forwardLoop(fin, fout stream.XFlow) {
for {
_, err := io.Copy(fin, fout)
if err == io.EOF {
return
} else if err != nil {
vlog.Errorf("f.Read failed: %v", err)
return
}
}
}