veyron/services/mgmt/logreader/impl: Add glob for logs directory.

This change adds a Globbable implementation to find the list of files in
a logs directory.

There is also some minor refactoring.

Change-Id: Iaded5cfc43d2dc49a19c3160a7ecfecc1cfcf77e
diff --git a/services/mgmt/logreader/impl/logfile_invoker_test.go b/services/mgmt/logreader/impl/logfile_invoker_test.go
new file mode 100644
index 0000000..2b79268
--- /dev/null
+++ b/services/mgmt/logreader/impl/logfile_invoker_test.go
@@ -0,0 +1,195 @@
+package impl_test
+
+import (
+	"io/ioutil"
+	"os"
+	"path"
+	"testing"
+
+	"veyron/services/mgmt/logreader/impl"
+
+	"veyron2/ipc"
+	"veyron2/naming"
+	"veyron2/rt"
+	"veyron2/security"
+	"veyron2/services/mgmt/logreader"
+	"veyron2/verror"
+)
+
+type logFileDispatcher struct {
+	root string
+}
+
+func (d *logFileDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+	invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewLogFileInvoker(d.root, suffix)))
+	return invoker, nil, nil
+}
+
+func writeAndSync(t *testing.T, w *os.File, s string) {
+	if _, err := w.WriteString(s); err != nil {
+		t.Fatalf("w.WriteString failed: %v", err)
+	}
+	if err := w.Sync(); err != nil {
+		t.Fatalf("w.Sync failed: %v", err)
+	}
+}
+
+func TestReadLogImplNoFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+	for _, s := range tests {
+		writeAndSync(t, writer, s+"\n")
+	}
+
+	// Try to access a file that doesn't exist.
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//doesntexist"))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if expected := verror.NotFound; !verror.Is(err, expected) {
+		t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+	}
+
+	// Try to access a file that does exist.
+	lf, err = logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read without follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	expectedPosition := int64(0)
+	for count := 0; rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != expectedPosition {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, expectedPosition)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		expectedPosition += int64(len(entry.Line)) + 1
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	offset, err := stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+	if offset != expectedPosition {
+		t.Errorf("unexpected offset. Got %q, want %q", offset, expectedPosition)
+	}
+
+	// Read with follow from EOF (where the previous read ended).
+	stream, err = lf.ReadLog(runtime.NewContext(), offset, logreader.AllEntries, false)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	_, err = stream.Finish()
+	if !verror.Is(err, logreader.EOF) {
+		t.Errorf("unexpected error, got %#v, want EOF", err)
+	}
+}
+
+func TestReadLogImplWithFollow(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	const testFile = "mylogfile.INFO"
+	writer, err := os.Create(path.Join(workdir, testFile))
+	if err != nil {
+		t.Fatalf("Create failed: %v", err)
+	}
+
+	tests := []string{
+		"Hello World!",
+		"Life is too short",
+		"Have fun",
+		"Play hard",
+		"Break something",
+		"Fix it later",
+	}
+
+	lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+	if err != nil {
+		t.Errorf("BindLogFile: %v", err)
+	}
+	_, err = lf.Size(runtime.NewContext())
+	if err != nil {
+		t.Errorf("Size failed: %v", err)
+	}
+
+	// Read with follow.
+	stream, err := lf.ReadLog(runtime.NewContext(), 0, int32(len(tests)), true)
+	if err != nil {
+		t.Errorf("ReadLog failed: %v", err)
+	}
+	rStream := stream.RecvStream()
+	writeAndSync(t, writer, tests[0]+"\n")
+	for count, pos := 0, int64(0); rStream.Advance(); count++ {
+		entry := rStream.Value()
+		if entry.Position != pos {
+			t.Errorf("unexpected position. Got %v, want %v", entry.Position, pos)
+		}
+		if expected := tests[count]; entry.Line != expected {
+			t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+		}
+		pos += int64(len(entry.Line)) + 1
+		if count+1 < len(tests) {
+			writeAndSync(t, writer, tests[count+1]+"\n")
+		}
+	}
+
+	if err := rStream.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", rStream.Err())
+	}
+	_, err = stream.Finish()
+	if err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+}