blob: 265cc3b59d721dd9397286deb00bdabf7a2a8aad [file] [log] [blame]
Jiri Simsa5293dcb2014-05-10 09:56:38 -07001package impl
2
3import (
4 "fmt"
5 "io"
6
7 "veyron/examples/tunnel"
8 "veyron2/vlog"
9)
10
11func runIOManager(stdin io.Writer, stdout, stderr io.Reader, ptyFd uintptr, stream tunnel.TunnelServiceShellStream) error {
12 m := ioManager{stdin: stdin, stdout: stdout, stderr: stderr, ptyFd: ptyFd, stream: stream}
13 return m.run()
14}
15
16// ioManager manages the forwarding of all the data between the shell and the
17// stream.
18type ioManager struct {
19 stdin io.Writer
20 stdout, stderr io.Reader
21 ptyFd uintptr
22 stream tunnel.TunnelServiceShellStream
23
24 // done receives any error from chan2stream, user2stream, or
25 // stream2user.
26 done chan error
27 // outchan is used to serialize the output to the stream. This is
28 // needed because stream.Send is not thread-safe.
29 outchan chan tunnel.ServerShellPacket
Asim Shankar5659aab2014-07-29 12:41:07 -070030 // closed is closed when run() exits.
31 closed chan struct{}
Jiri Simsa5293dcb2014-05-10 09:56:38 -070032}
33
34func (m *ioManager) run() error {
35 // done receives any error from chan2stream, stdout2stream, or
36 // stream2stdin.
37 m.done = make(chan error, 3)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070038 // outchan is used to serialize the output to the stream.
39 // chan2stream() receives data sent by stdout2outchan() and
40 // stderr2outchan() and sends it to the stream.
41 m.outchan = make(chan tunnel.ServerShellPacket)
Asim Shankar5659aab2014-07-29 12:41:07 -070042 m.closed = make(chan struct{})
43 defer close(m.closed)
Jiri Simsa5293dcb2014-05-10 09:56:38 -070044 go m.chan2stream()
45
46 // Forward data between the shell's stdio and the stream.
47 go m.stdout2outchan()
48 if m.stderr != nil {
49 go m.stderr2outchan()
50 }
51 go m.stream2stdin()
52
53 // Block until something reports an error.
54 return <-m.done
55}
56
57// chan2stream receives ServerShellPacket from outchan and sends it to stream.
58func (m *ioManager) chan2stream() {
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -070059 sender := m.stream.SendStream()
Jiri Simsa5293dcb2014-05-10 09:56:38 -070060 for packet := range m.outchan {
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -070061 if err := sender.Send(packet); err != nil {
Jiri Simsa5293dcb2014-05-10 09:56:38 -070062 m.done <- err
63 return
64 }
65 }
66 m.done <- io.EOF
67}
68
Asim Shankar5659aab2014-07-29 12:41:07 -070069func (m *ioManager) sendOnOutchan(p tunnel.ServerShellPacket) bool {
70 select {
71 case m.outchan <- p:
72 return true
73 case <-m.closed:
74 return false
75 }
76}
77
Jiri Simsa5293dcb2014-05-10 09:56:38 -070078// stdout2stream reads data from the shell's stdout and sends it to the outchan.
79func (m *ioManager) stdout2outchan() {
80 for {
81 buf := make([]byte, 2048)
82 n, err := m.stdout.Read(buf[:])
83 if err != nil {
84 vlog.VI(2).Infof("stdout2outchan: %v", err)
85 m.done <- err
86 return
87 }
Asim Shankar5659aab2014-07-29 12:41:07 -070088 if !m.sendOnOutchan(tunnel.ServerShellPacket{Stdout: buf[:n]}) {
89 return
90 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -070091 }
92}
93
94// stderr2stream reads data from the shell's stderr and sends it to the outchan.
95func (m *ioManager) stderr2outchan() {
96 for {
97 buf := make([]byte, 2048)
98 n, err := m.stderr.Read(buf[:])
99 if err != nil {
100 vlog.VI(2).Infof("stderr2outchan: %v", err)
101 m.done <- err
102 return
103 }
Asim Shankar5659aab2014-07-29 12:41:07 -0700104 if !m.sendOnOutchan(tunnel.ServerShellPacket{Stderr: buf[:n]}) {
105 return
106 }
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700107 }
108}
109
110// stream2stdin reads data from the stream and sends it to the shell's stdin.
111func (m *ioManager) stream2stdin() {
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700112 rStream := m.stream.RecvStream()
113 for rStream.Advance() {
114 packet := rStream.Value()
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700115 if len(packet.Stdin) > 0 {
116 if n, err := m.stdin.Write(packet.Stdin); n != len(packet.Stdin) || err != nil {
117 m.done <- fmt.Errorf("stdin.Write returned (%d, %v) want (%d, nil)", n, err, len(packet.Stdin))
118 return
119 }
120 }
121 if packet.Rows > 0 && packet.Cols > 0 && m.ptyFd != 0 {
122 setWindowSize(m.ptyFd, packet.Rows, packet.Cols)
123 }
124 }
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700125
Shyam Jayaraman97b9dca2014-07-31 13:30:46 -0700126 err := rStream.Err()
Shyam Jayaramanc4aed6e2014-07-22 14:25:06 -0700127 if err == nil {
128 err = io.EOF
129 }
130
131 vlog.VI(2).Infof("stream2stdin: %v", err)
132 m.done <- err
Jiri Simsa5293dcb2014-05-10 09:56:38 -0700133}