Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 1 | package impl |
| 2 | |
| 3 | import ( |
| 4 | "bytes" |
| 5 | "io" |
| 6 | "strings" |
| 7 | "time" |
| 8 | |
Jiri Simsa | 519c507 | 2014-09-17 21:37:57 -0700 | [diff] [blame] | 9 | "veyron.io/veyron/veyron2/ipc" |
Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 10 | ) |
| 11 | |
| 12 | // followReader implements the functionality of io.Reader, plus: |
| 13 | // - it can block for new input when the end of the file is reached, and |
| 14 | // - it aborts when the parent RPC is canceled. |
| 15 | type followReader struct { |
| 16 | reader io.ReadSeeker |
Robin Thellend | bde278d | 2014-11-19 15:07:32 -0800 | [diff] [blame] | 17 | ctx ipc.ServerContext |
Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 18 | offset int64 |
| 19 | follow bool |
| 20 | err error |
| 21 | buf []byte |
| 22 | } |
| 23 | |
| 24 | // newFollowReader is the factory for followReader. |
Robin Thellend | bde278d | 2014-11-19 15:07:32 -0800 | [diff] [blame] | 25 | func newFollowReader(ctx ipc.ServerContext, reader io.ReadSeeker, startpos int64, follow bool) *followReader { |
Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 26 | _, err := reader.Seek(startpos, 0) |
| 27 | return &followReader{ |
| 28 | reader: reader, |
Robin Thellend | bde278d | 2014-11-19 15:07:32 -0800 | [diff] [blame] | 29 | ctx: ctx, |
Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 30 | offset: startpos, |
| 31 | follow: follow, |
| 32 | err: err, |
| 33 | } |
| 34 | } |
| 35 | |
| 36 | // tell returns the offset where the next read will start. |
| 37 | func (f *followReader) tell() int64 { |
| 38 | return f.offset |
| 39 | } |
| 40 | |
| 41 | func (f *followReader) read(b []byte) (int, error) { |
| 42 | if f.err != nil { |
| 43 | return 0, f.err |
| 44 | } |
| 45 | for { |
Robin Thellend | bde278d | 2014-11-19 15:07:32 -0800 | [diff] [blame] | 46 | if f.ctx != nil { |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 47 | select { |
Robin Thellend | bde278d | 2014-11-19 15:07:32 -0800 | [diff] [blame] | 48 | case <-f.ctx.Done(): |
Matt Rosencrantz | 137b8d2 | 2014-08-18 09:56:15 -0700 | [diff] [blame] | 49 | return 0, errCanceled |
| 50 | default: |
| 51 | } |
Robin Thellend | 63bd657 | 2014-08-13 17:31:51 -0700 | [diff] [blame] | 52 | } |
| 53 | n, err := f.reader.Read(b) |
| 54 | if n == 0 && err == nil { |
| 55 | // According to http://golang.org/pkg/io/#Reader, this |
| 56 | // weird case should be treated as a no-op. |
| 57 | continue |
| 58 | } |
| 59 | if n > 0 && err == io.EOF { |
| 60 | err = nil |
| 61 | } |
| 62 | if err == io.EOF && f.follow { |
| 63 | time.Sleep(500 * time.Millisecond) |
| 64 | continue |
| 65 | } |
| 66 | return n, err |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | // readLine returns a whole line as a string, and the offset where it starts in |
| 71 | // the file. White spaces are removed from the beginning and the end of the line. |
| 72 | // If readLine returns an error, the other two return values should be discarded. |
| 73 | func (f *followReader) readLine() (string, int64, error) { |
| 74 | startOff := f.offset |
| 75 | var off int |
| 76 | for { |
| 77 | off = bytes.IndexByte(f.buf, '\n') + 1 |
| 78 | if off != 0 { |
| 79 | break |
| 80 | } |
| 81 | b := make([]byte, 2048) |
| 82 | n, err := f.read(b) |
| 83 | if n > 0 { |
| 84 | f.buf = append(f.buf, b[:n]...) |
| 85 | continue |
| 86 | } |
| 87 | return "", 0, err |
| 88 | } |
| 89 | line := f.buf[:off-1] // -1 to remove the trailing \n |
| 90 | f.buf = f.buf[off:] |
| 91 | f.offset += int64(off) |
| 92 | return strings.TrimSpace(string(line)), startOff, nil |
| 93 | } |