blob: ea047385faac8969509ae64db805868560c8650d [file] [log] [blame]
package impl
import (
"fmt"
"io"
"veyron/examples/tunnel"
"veyron2/vlog"
)
func runIOManager(stdin io.Writer, stdout, stderr io.Reader, ptyFd uintptr, stream tunnel.TunnelServiceShellStream) error {
m := ioManager{stdin: stdin, stdout: stdout, stderr: stderr, ptyFd: ptyFd, stream: stream}
return m.run()
}
// ioManager manages the forwarding of all the data between the shell and the
// stream.
type ioManager struct {
stdin io.Writer
stdout, stderr io.Reader
ptyFd uintptr
stream tunnel.TunnelServiceShellStream
// done receives any error from chan2stream, user2stream, or
// stream2user.
done chan error
// outchan is used to serialize the output to the stream. This is
// needed because stream.Send is not thread-safe.
outchan chan tunnel.ServerShellPacket
}
func (m *ioManager) run() error {
// done receives any error from chan2stream, stdout2stream, or
// stream2stdin.
m.done = make(chan error, 3)
// outchan is used to serialize the output to the stream.
// chan2stream() receives data sent by stdout2outchan() and
// stderr2outchan() and sends it to the stream.
m.outchan = make(chan tunnel.ServerShellPacket)
defer close(m.outchan)
go m.chan2stream()
// Forward data between the shell's stdio and the stream.
go m.stdout2outchan()
if m.stderr != nil {
go m.stderr2outchan()
}
go m.stream2stdin()
// Block until something reports an error.
return <-m.done
}
// chan2stream receives ServerShellPacket from outchan and sends it to stream.
func (m *ioManager) chan2stream() {
for packet := range m.outchan {
if err := m.stream.Send(packet); err != nil {
m.done <- err
return
}
}
m.done <- io.EOF
}
// stdout2stream reads data from the shell's stdout and sends it to the outchan.
func (m *ioManager) stdout2outchan() {
for {
buf := make([]byte, 2048)
n, err := m.stdout.Read(buf[:])
if err != nil {
vlog.VI(2).Infof("stdout2outchan: %v", err)
m.done <- err
return
}
m.outchan <- tunnel.ServerShellPacket{Stdout: buf[:n]}
}
}
// stderr2stream reads data from the shell's stderr and sends it to the outchan.
func (m *ioManager) stderr2outchan() {
for {
buf := make([]byte, 2048)
n, err := m.stderr.Read(buf[:])
if err != nil {
vlog.VI(2).Infof("stderr2outchan: %v", err)
m.done <- err
return
}
m.outchan <- tunnel.ServerShellPacket{Stderr: buf[:n]}
}
}
// stream2stdin reads data from the stream and sends it to the shell's stdin.
func (m *ioManager) stream2stdin() {
for m.stream.Advance() {
packet := m.stream.Value()
if len(packet.Stdin) > 0 {
if n, err := m.stdin.Write(packet.Stdin); n != len(packet.Stdin) || err != nil {
m.done <- fmt.Errorf("stdin.Write returned (%d, %v) want (%d, nil)", n, err, len(packet.Stdin))
return
}
}
if packet.Rows > 0 && packet.Cols > 0 && m.ptyFd != 0 {
setWindowSize(m.ptyFd, packet.Rows, packet.Cols)
}
}
err := m.stream.Err()
if err == nil {
err = io.EOF
}
vlog.VI(2).Infof("stream2stdin: %v", err)
m.done <- err
}