veyron/services/mgmt/logreader: Add LogFile service implementation.

This change adds an implementation of the LogFile interface, which can
be used to access log files remotely. It is not yet used anywhere other than
tests.

Change-Id: If0b87f08ff4cfa877dd5e4503687865fa6c50ac8
diff --git a/services/mgmt/logreader/impl/invoker.go b/services/mgmt/logreader/impl/invoker.go
new file mode 100644
index 0000000..25f58a7
--- /dev/null
+++ b/services/mgmt/logreader/impl/invoker.go
@@ -0,0 +1,111 @@
+// Package impl implements the LogFile interface from
+// veyron2/services/mgmt/logreader. It can be used to allow remote access to
+// log files.
+package impl
+
+import (
+	"io"
+	"math"
+	"os"
+	"path"
+	"strings"
+
+	"veyron2/ipc"
+	"veyron2/services/mgmt/logreader"
+	"veyron2/verror"
+	"veyron2/vlog"
+)
+
+// invoker holds the state of a logfile invocation.
+type invoker struct {
+	// root is the root directory from which the object names are based.
+	root string
+	// suffix is the suffix of the current invocation that is assumed to
+	// be used as a relative object name to identify a log file.
+	suffix string
+}
+
+var (
+	errCanceled        = verror.Abortedf("operation canceled")
+	errNotFound        = verror.NotFoundf("log file not found")
+	errEOF             = verror.Make(logreader.EOF, "EOF")
+	errOperationFailed = verror.Internalf("operation failed")
+)
+
+// NewInvoker is the invoker factory.
+func NewInvoker(root, suffix string) *invoker {
+	return &invoker{
+		root:   path.Clean(root),
+		suffix: suffix,
+	}
+}
+
+// fileName returns the file name that corresponds to the object name.
+func (i *invoker) fileName() (string, error) {
+	p := path.Join(i.root, i.suffix)
+	// Make sure we're not asked to read a file outside of the root
+	// directory. This could happen if suffix contains "../", which get
+	// collapsed by path.Join().
+	if !strings.HasPrefix(p, i.root) {
+		return "", errOperationFailed
+	}
+	return p, nil
+}
+
+// Size returns the size of the log file, in bytes.
+func (i *invoker) Size(context ipc.ServerContext) (int64, error) {
+	vlog.VI(1).Infof("%v.Size()", i.suffix)
+	fname, err := i.fileName()
+	if err != nil {
+		return 0, err
+	}
+	fi, err := os.Stat(fname)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return 0, errNotFound
+		}
+		vlog.Errorf("Stat(%v) failed: %v", fname, err)
+		return 0, errOperationFailed
+	}
+	if fi.IsDir() {
+		return 0, errOperationFailed
+	}
+	return fi.Size(), nil
+}
+
+// ReadLog returns log entries from the log file.
+func (i *invoker) ReadLog(context ipc.ServerContext, startpos int64, numEntries int32, follow bool, stream logreader.LogFileServiceReadLogStream) (int64, error) {
+	vlog.VI(1).Infof("%v.ReadLog(%v, %v, %v)", i.suffix, startpos, numEntries, follow)
+	fname, err := i.fileName()
+	if err != nil {
+		return 0, err
+	}
+	f, err := os.Open(fname)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return 0, errNotFound
+		}
+		return 0, errOperationFailed
+	}
+	reader := newFollowReader(context, f, startpos, follow)
+	if numEntries == logreader.AllEntries {
+		numEntries = int32(math.MaxInt32)
+	}
+	sender := stream.SendStream()
+	for n := int32(0); n < numEntries; n++ {
+		line, offset, err := reader.readLine()
+		if err == io.EOF && n > 0 {
+			return reader.tell(), nil
+		}
+		if err == io.EOF {
+			return reader.tell(), errEOF
+		}
+		if err != nil {
+			return reader.tell(), errOperationFailed
+		}
+		if err := sender.Send(logreader.LogEntry{Position: offset, Line: line}); err != nil {
+			return reader.tell(), err
+		}
+	}
+	return reader.tell(), nil
+}
diff --git a/services/mgmt/logreader/impl/invoker_test.go b/services/mgmt/logreader/impl/invoker_test.go
new file mode 100644
index 0000000..ffa19f9
--- /dev/null
+++ b/services/mgmt/logreader/impl/invoker_test.go
@@ -0,0 +1,220 @@
+package impl_test
+
+import (
+	"io/ioutil"
+	"os"
+	"path"
+	"testing"
+
+	"veyron/services/mgmt/logreader/impl"
+
+	"veyron2/ipc"
+	"veyron2/naming"
+	"veyron2/rt"
+	"veyron2/security"
+	"veyron2/services/mgmt/logreader"
+	"veyron2/verror"
+)
+
+type dispatcher struct {
+	root string
+}
+
+func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+	invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewInvoker(d.root, suffix)))
+	return invoker, nil, nil
+}
+
+func startServer(t *testing.T, root string) (ipc.Server, string, error) {
+	disp := &dispatcher{root}
+	server, err := rt.R().NewServer()
+	if err != nil {
+		t.Fatalf("NewServer failed: %v", err)
+		return nil, "", err
+	}
+	endpoint, err := server.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("Listen failed: %v", err)
+		return nil, "", err
+	}
+	if err := server.Serve("", disp); err != nil {
+		t.Fatalf("Serve failed: %v", err)
+		return nil, "", err
+	}
+	return server, endpoint.String(), nil
+}
+
+func stopServer(t *testing.T, server ipc.Server) {
+	if err := server.Stop(); err != nil {
+		t.Errorf("server.Stop failed: %v", err)
+	}
+}
+
+func writeAndSync(t *testing.T, w *os.File, s string) {
+	if _, err := w.WriteString(s); err != nil {
+		t.Fatalf("w.WriteString failed: %v", err)
+	}
+	if err := w.Sync(); err != nil {
+		t.Fatalf("w.Sync failed: %v", err)
+	}
+}
+
+func TestReadLogImplNoFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, workdir)
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+	for _, s := range tests {
+		writeAndSync(t, writer, s+"\n")
+	}
+
+	// Try to access a file that doesn't exist.
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//doesntexist"))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if expected := verror.NotFound; !verror.Is(err, expected) {
+		t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+	}
+
+	// Try to access a file that does exist.
+	lf, err = logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read without follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	expectedPosition := int64(0)
+	for count := 0; rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != expectedPosition {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, expectedPosition)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		expectedPosition += int64(len(entry.Line)) + 1
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	offset, err := stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+	if offset != expectedPosition {
+		t.Errorf("unexpected offset. Got %q, want %q", offset, expectedPosition)
+	}
+
+	// Read with follow from EOF (where the previous read ended).
+	stream, err = lf.ReadLog(runtime.NewContext(), offset, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	_, err = stream.Finish()
+	if !verror.Is(err, logreader.EOF) {
+		t.Errorf("unexpected error, got %#v, want EOF", err)
+	}
+}
+
+func TestReadLogImplWithFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, workdir)
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read with follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, int32(len(tests)), true)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	writeAndSync(t, writer, tests[0]+"\n")
+	for count, pos := 0, int64(0); rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != pos {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, pos)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		pos += int64(len(entry.Line)) + 1
+		if count+1 < len(tests) {
+			writeAndSync(t, writer, tests[count+1]+"\n")
+		}
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	_, err = stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+}
diff --git a/services/mgmt/logreader/impl/reader.go b/services/mgmt/logreader/impl/reader.go
new file mode 100644
index 0000000..a31cec0
--- /dev/null
+++ b/services/mgmt/logreader/impl/reader.go
@@ -0,0 +1,89 @@
+package impl
+
+import (
+	"bytes"
+	"io"
+	"strings"
+	"time"
+
+	"veyron2/ipc"
+)
+
+// followReader implements the functionality of io.Reader, plus:
+// - it can block for new input when the end of the file is reached, and
+// - it aborts when the parent RPC is canceled.
+type followReader struct {
+	reader io.ReadSeeker
+	ctx    ipc.ServerContext
+	offset int64
+	follow bool
+	err    error
+	buf    []byte
+}
+
+// newFollowReader is the factory for followReader.
+func newFollowReader(ctx ipc.ServerContext, reader io.ReadSeeker, startpos int64, follow bool) *followReader {
+	_, err := reader.Seek(startpos, 0)
+	return &followReader{
+		reader: reader,
+		ctx:    ctx,
+		offset: startpos,
+		follow: follow,
+		err:    err,
+	}
+}
+
+// tell returns the offset where the next read will start.
+func (f *followReader) tell() int64 {
+	return f.offset
+}
+
+func (f *followReader) read(b []byte) (int, error) {
+	if f.err != nil {
+		return 0, f.err
+	}
+	for {
+		if f.ctx != nil && f.ctx.IsClosed() {
+			return 0, errCanceled
+		}
+		n, err := f.reader.Read(b)
+		if n == 0 && err == nil {
+			// According to http://golang.org/pkg/io/#Reader, this
+			// weird case should be treated as a no-op.
+			continue
+		}
+		if n > 0 && err == io.EOF {
+			err = nil
+		}
+		if err == io.EOF && f.follow {
+			time.Sleep(500 * time.Millisecond)
+			continue
+		}
+		return n, err
+	}
+}
+
+// readLine returns a whole line as a string, and the offset where it starts in
+// the file. White spaces are removed from the beginning and the end of the line.
+// If readLine returns an error, the other two return values should be discarded.
+func (f *followReader) readLine() (string, int64, error) {
+	startOff := f.offset
+	var off int
+	for {
+		off = bytes.IndexByte(f.buf, '\n') + 1
+		if off != 0 {
+			break
+		}
+		b := make([]byte, 2048)
+		n, err := f.read(b)
+		if n > 0 {
+			f.buf = append(f.buf, b[:n]...)
+			continue
+		}
+		return "", 0, err
+	}
+	line := f.buf[:off-1] // -1 to remove the trailing \n
+	f.buf = f.buf[off:]
+	f.offset += int64(off)
+	return strings.TrimSpace(string(line)), startOff, nil
+}
diff --git a/services/mgmt/logreader/impl/reader_test.go b/services/mgmt/logreader/impl/reader_test.go
new file mode 100644
index 0000000..fd000a0
--- /dev/null
+++ b/services/mgmt/logreader/impl/reader_test.go
@@ -0,0 +1,136 @@
+package impl
+
+import (
+	"io"
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+)
+
+func writeAndSync(t *testing.T, w *os.File, s string) {
+	if _, err := w.WriteString(s); err != nil {
+		t.Fatalf("w.WriteString failed: %v", err)
+	}
+	if err := w.Sync(); err != nil {
+		t.Fatalf("w.Sync failed: %v", err)
+	}
+}
+
+func TestFollowReaderNoFollow(t *testing.T) {
+	w, err := ioutil.TempFile("", "reader-test-")
+	if err != nil {
+		t.Fatalf("ioutil.TempFile: unexpected error: %v", err)
+	}
+	defer w.Close()
+	defer os.Remove(w.Name())
+
+	tests := []string{
+		"Hello world",
+		"Hello world Two",
+		"Hello world Three",
+	}
+	for _, s := range tests {
+		writeAndSync(t, w, s+"\n")
+	}
+	writeAndSync(t, w, "Partial line with no newline")
+
+	r, err := os.Open(w.Name())
+	if err != nil {
+		t.Fatalf("os.Open: unexpected error: %v", err)
+	}
+	defer r.Close()
+
+	f := newFollowReader(nil, r, 0, false)
+	if f == nil {
+		t.Fatalf("newFollowReader return nil")
+	}
+
+	var expectedOffset int64
+	for _, s := range tests {
+		line, offset, err := f.readLine()
+		if err != nil {
+			t.Errorf("readLine, unexpected error: %v", err)
+		}
+		if line != s {
+			t.Errorf("unexpected result. Got %v, want %v", line, s)
+		}
+		if offset != expectedOffset {
+			t.Errorf("unexpected result. Got %v, want %v", offset, expectedOffset)
+		}
+		expectedOffset += int64(len(s)) + 1
+	}
+
+	// Attempt to read the partial line.
+	if line, _, err := f.readLine(); line != "" || err != io.EOF {
+		t.Errorf("unexpected result. Got %v:%v, want \"\":EOF", line, err)
+	}
+}
+
+func sleep() {
+	time.Sleep(500 * time.Millisecond)
+}
+
+func TestFollowReaderWithFollow(t *testing.T) {
+	w, err := ioutil.TempFile("", "reader-test-")
+	if err != nil {
+		t.Fatalf("ioutil.TempFile: unexpected error: %v", err)
+	}
+	defer w.Close()
+	defer os.Remove(w.Name())
+
+	tests := []string{
+		"Hello world",
+		"Hello world Two",
+		"Hello world Three",
+	}
+	go func() {
+		for _, s := range tests {
+			sleep()
+			writeAndSync(t, w, s+"\n")
+		}
+		sleep()
+		writeAndSync(t, w, "Hello ")
+		sleep()
+		writeAndSync(t, w, "world ")
+		sleep()
+		writeAndSync(t, w, "Four\n")
+	}()
+
+	r, err := os.Open(w.Name())
+	if err != nil {
+		t.Fatalf("os.Open: unexpected error: %v", err)
+	}
+	defer r.Close()
+
+	f := newFollowReader(nil, r, 0, true)
+	if f == nil {
+		t.Fatalf("newFollowReader return nil")
+	}
+
+	var expectedOffset int64
+	for _, s := range tests {
+		line, offset, err := f.readLine()
+		if err != nil {
+			t.Errorf("readLine, unexpected error: %v", err)
+		}
+		if line != s {
+			t.Errorf("unexpected result. Got %v, want %v", line, s)
+		}
+		if offset != expectedOffset {
+			t.Errorf("unexpected result. Got %v, want %v", offset, expectedOffset)
+		}
+		expectedOffset += int64(len(s)) + 1
+	}
+
+	line, offset, err := f.readLine()
+	if err != nil {
+		t.Errorf("readLine, unexpected error: %v", err)
+	}
+	if expected := "Hello world Four"; line != expected {
+		t.Errorf("unexpected result. Got %q, want %q", line, expected)
+	}
+	if offset != expectedOffset {
+		t.Errorf("unexpected result. Got %v, want %v", offset, expectedOffset)
+	}
+}