blob: 97d1272319b6532a157e0580090d78ebfe4d129a [file] [log] [blame]
Robin Thellend63bd6572014-08-13 17:31:51 -07001package impl
2
3import (
4 "bytes"
5 "io"
6 "strings"
7 "time"
8
Jiri Simsa519c5072014-09-17 21:37:57 -07009 "veyron.io/veyron/veyron2/ipc"
Robin Thellend63bd6572014-08-13 17:31:51 -070010)
11
12// followReader implements the functionality of io.Reader, plus:
13// - it can block for new input when the end of the file is reached, and
14// - it aborts when the parent RPC is canceled.
15type followReader struct {
16 reader io.ReadSeeker
Robin Thellendbde278d2014-11-19 15:07:32 -080017 ctx ipc.ServerContext
Robin Thellend63bd6572014-08-13 17:31:51 -070018 offset int64
19 follow bool
20 err error
21 buf []byte
22}
23
24// newFollowReader is the factory for followReader.
Robin Thellendbde278d2014-11-19 15:07:32 -080025func newFollowReader(ctx ipc.ServerContext, reader io.ReadSeeker, startpos int64, follow bool) *followReader {
Robin Thellend63bd6572014-08-13 17:31:51 -070026 _, err := reader.Seek(startpos, 0)
27 return &followReader{
28 reader: reader,
Robin Thellendbde278d2014-11-19 15:07:32 -080029 ctx: ctx,
Robin Thellend63bd6572014-08-13 17:31:51 -070030 offset: startpos,
31 follow: follow,
32 err: err,
33 }
34}
35
36// tell returns the offset where the next read will start.
37func (f *followReader) tell() int64 {
38 return f.offset
39}
40
41func (f *followReader) read(b []byte) (int, error) {
42 if f.err != nil {
43 return 0, f.err
44 }
45 for {
Robin Thellendbde278d2014-11-19 15:07:32 -080046 if f.ctx != nil {
Matt Rosencrantz137b8d22014-08-18 09:56:15 -070047 select {
Robin Thellendbde278d2014-11-19 15:07:32 -080048 case <-f.ctx.Done():
Matt Rosencrantz137b8d22014-08-18 09:56:15 -070049 return 0, errCanceled
50 default:
51 }
Robin Thellend63bd6572014-08-13 17:31:51 -070052 }
53 n, err := f.reader.Read(b)
54 if n == 0 && err == nil {
55 // According to http://golang.org/pkg/io/#Reader, this
56 // weird case should be treated as a no-op.
57 continue
58 }
59 if n > 0 && err == io.EOF {
60 err = nil
61 }
62 if err == io.EOF && f.follow {
63 time.Sleep(500 * time.Millisecond)
64 continue
65 }
66 return n, err
67 }
68}
69
70// readLine returns a whole line as a string, and the offset where it starts in
71// the file. White spaces are removed from the beginning and the end of the line.
72// If readLine returns an error, the other two return values should be discarded.
73func (f *followReader) readLine() (string, int64, error) {
74 startOff := f.offset
75 var off int
76 for {
77 off = bytes.IndexByte(f.buf, '\n') + 1
78 if off != 0 {
79 break
80 }
81 b := make([]byte, 2048)
82 n, err := f.read(b)
83 if n > 0 {
84 f.buf = append(f.buf, b[:n]...)
85 continue
86 }
87 return "", 0, err
88 }
89 line := f.buf[:off-1] // -1 to remove the trailing \n
90 f.buf = f.buf[off:]
91 f.offset += int64(off)
92 return strings.TrimSpace(string(line)), startOff, nil
93}