veyron/services/mgmt/logreader: Add LogFile service implementation.

This change adds an implementation of the LogFile interface, which can
be used to access log files remotely. It is not yet used anywhere other than
tests.

Change-Id: If0b87f08ff4cfa877dd5e4503687865fa6c50ac8
diff --git a/services/mgmt/logreader/impl/invoker_test.go b/services/mgmt/logreader/impl/invoker_test.go
new file mode 100644
index 0000000..ffa19f9
--- /dev/null
+++ b/services/mgmt/logreader/impl/invoker_test.go
@@ -0,0 +1,220 @@
+package impl_test
+
+import (
+	"io/ioutil"
+	"os"
+	"path"
+	"testing"
+
+	"veyron/services/mgmt/logreader/impl"
+
+	"veyron2/ipc"
+	"veyron2/naming"
+	"veyron2/rt"
+	"veyron2/security"
+	"veyron2/services/mgmt/logreader"
+	"veyron2/verror"
+)
+
+type dispatcher struct {
+	root string
+}
+
+func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+	invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewInvoker(d.root, suffix)))
+	return invoker, nil, nil
+}
+
+func startServer(t *testing.T, root string) (ipc.Server, string, error) {
+	disp := &dispatcher{root}
+	server, err := rt.R().NewServer()
+	if err != nil {
+		t.Fatalf("NewServer failed: %v", err)
+		return nil, "", err
+	}
+	endpoint, err := server.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("Listen failed: %v", err)
+		return nil, "", err
+	}
+	if err := server.Serve("", disp); err != nil {
+		t.Fatalf("Serve failed: %v", err)
+		return nil, "", err
+	}
+	return server, endpoint.String(), nil
+}
+
+func stopServer(t *testing.T, server ipc.Server) {
+	if err := server.Stop(); err != nil {
+		t.Errorf("server.Stop failed: %v", err)
+	}
+}
+
+func writeAndSync(t *testing.T, w *os.File, s string) {
+	if _, err := w.WriteString(s); err != nil {
+		t.Fatalf("w.WriteString failed: %v", err)
+	}
+	if err := w.Sync(); err != nil {
+		t.Fatalf("w.Sync failed: %v", err)
+	}
+}
+
+func TestReadLogImplNoFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, workdir)
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+	for _, s := range tests {
+		writeAndSync(t, writer, s+"\n")
+	}
+
+	// Try to access a file that doesn't exist.
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//doesntexist"))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if expected := verror.NotFound; !verror.Is(err, expected) {
+		t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+	}
+
+	// Try to access a file that does exist.
+	lf, err = logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read without follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	expectedPosition := int64(0)
+	for count := 0; rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != expectedPosition {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, expectedPosition)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		expectedPosition += int64(len(entry.Line)) + 1
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	offset, err := stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+	if offset != expectedPosition {
+		t.Errorf("unexpected offset. Got %q, want %q", offset, expectedPosition)
+	}
+
+	// Read with follow from EOF (where the previous read ended).
+	stream, err = lf.ReadLog(runtime.NewContext(), offset, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	_, err = stream.Finish()
+	if !verror.Is(err, logreader.EOF) {
+		t.Errorf("unexpected error, got %#v, want EOF", err)
+	}
+}
+
+func TestReadLogImplWithFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, workdir)
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read with follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, int32(len(tests)), true)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	writeAndSync(t, writer, tests[0]+"\n")
+	for count, pos := 0, int64(0); rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != pos {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, pos)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		pos += int64(len(entry.Line)) + 1
+		if count+1 < len(tests) {
+			writeAndSync(t, writer, tests[count+1]+"\n")
+		}
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	_, err = stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+}