veyron/services/mgmt/logreader/impl: Add glob for logs directory.
This change adds a Globbable implementation to find the list of files in
a logs directory.
There is also some minor refactoring.
Change-Id: Iaded5cfc43d2dc49a19c3160a7ecfecc1cfcf77e
diff --git a/services/mgmt/logreader/impl/logfile_invoker_test.go b/services/mgmt/logreader/impl/logfile_invoker_test.go
new file mode 100644
index 0000000..2b79268
--- /dev/null
+++ b/services/mgmt/logreader/impl/logfile_invoker_test.go
@@ -0,0 +1,195 @@
+package impl_test
+
+import (
+ "io/ioutil"
+ "os"
+ "path"
+ "testing"
+
+ "veyron/services/mgmt/logreader/impl"
+
+ "veyron2/ipc"
+ "veyron2/naming"
+ "veyron2/rt"
+ "veyron2/security"
+ "veyron2/services/mgmt/logreader"
+ "veyron2/verror"
+)
+
+type logFileDispatcher struct {
+ root string
+}
+
+func (d *logFileDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewLogFileInvoker(d.root, suffix)))
+ return invoker, nil, nil
+}
+
+func writeAndSync(t *testing.T, w *os.File, s string) {
+ if _, err := w.WriteString(s); err != nil {
+ t.Fatalf("w.WriteString failed: %v", err)
+ }
+ if err := w.Sync(); err != nil {
+ t.Fatalf("w.Sync failed: %v", err)
+ }
+}
+
+func TestReadLogImplNoFollow(t *testing.T) {
+ runtime := rt.Init()
+
+ workdir, err := ioutil.TempDir("", "logreadertest")
+ if err != nil {
+ t.Fatalf("ioutil.TempDir: %v", err)
+ }
+ defer os.RemoveAll(workdir)
+ server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
+ if err != nil {
+ t.Fatalf("startServer failed: %v", err)
+ }
+ defer stopServer(t, server)
+
+ const testFile = "mylogfile.INFO"
+ writer, err := os.Create(path.Join(workdir, testFile))
+ if err != nil {
+ t.Fatalf("Create failed: %v", err)
+ }
+
+ tests := []string{
+ "Hello World!",
+ "Life is too short",
+ "Have fun",
+ "Play hard",
+ "Break something",
+ "Fix it later",
+ }
+ for _, s := range tests {
+ writeAndSync(t, writer, s+"\n")
+ }
+
+ // Try to access a file that doesn't exist.
+ lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//doesntexist"))
+ if err != nil {
+ t.Errorf("BindLogFile: %v", err)
+ }
+ _, err = lf.Size(runtime.NewContext())
+ if expected := verror.NotFound; !verror.Is(err, expected) {
+ t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+ }
+
+ // Try to access a file that does exist.
+ lf, err = logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+ if err != nil {
+ t.Errorf("BindLogFile: %v", err)
+ }
+ _, err = lf.Size(runtime.NewContext())
+ if err != nil {
+ t.Errorf("Size failed: %v", err)
+ }
+
+ // Read without follow.
+ stream, err := lf.ReadLog(runtime.NewContext(), 0, logreader.AllEntries, false)
+ if err != nil {
+ t.Errorf("ReadLog failed: %v", err)
+ }
+ rStream := stream.RecvStream()
+ expectedPosition := int64(0)
+ for count := 0; rStream.Advance(); count++ {
+ entry := rStream.Value()
+ if entry.Position != expectedPosition {
+ t.Errorf("unexpected position. Got %v, want %v", entry.Position, expectedPosition)
+ }
+ if expected := tests[count]; entry.Line != expected {
+ t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+ }
+ expectedPosition += int64(len(entry.Line)) + 1
+ }
+
+ if err := rStream.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", rStream.Err())
+ }
+ offset, err := stream.Finish()
+ if err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+ if offset != expectedPosition {
+ t.Errorf("unexpected offset. Got %q, want %q", offset, expectedPosition)
+ }
+
+ // Read with follow from EOF (where the previous read ended).
+ stream, err = lf.ReadLog(runtime.NewContext(), offset, logreader.AllEntries, false)
+ if err != nil {
+ t.Errorf("ReadLog failed: %v", err)
+ }
+ _, err = stream.Finish()
+ if !verror.Is(err, logreader.EOF) {
+ t.Errorf("unexpected error, got %#v, want EOF", err)
+ }
+}
+
+func TestReadLogImplWithFollow(t *testing.T) {
+ runtime := rt.Init()
+
+ workdir, err := ioutil.TempDir("", "logreadertest")
+ if err != nil {
+ t.Fatalf("ioutil.TempDir: %v", err)
+ }
+ defer os.RemoveAll(workdir)
+ server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
+ if err != nil {
+ t.Fatalf("startServer failed: %v", err)
+ }
+ defer stopServer(t, server)
+
+ const testFile = "mylogfile.INFO"
+ writer, err := os.Create(path.Join(workdir, testFile))
+ if err != nil {
+ t.Fatalf("Create failed: %v", err)
+ }
+
+ tests := []string{
+ "Hello World!",
+ "Life is too short",
+ "Have fun",
+ "Play hard",
+ "Break something",
+ "Fix it later",
+ }
+
+ lf, err := logreader.BindLogFile(naming.JoinAddressName(endpoint, "//"+testFile))
+ if err != nil {
+ t.Errorf("BindLogFile: %v", err)
+ }
+ _, err = lf.Size(runtime.NewContext())
+ if err != nil {
+ t.Errorf("Size failed: %v", err)
+ }
+
+ // Read with follow.
+ stream, err := lf.ReadLog(runtime.NewContext(), 0, int32(len(tests)), true)
+ if err != nil {
+ t.Errorf("ReadLog failed: %v", err)
+ }
+ rStream := stream.RecvStream()
+ writeAndSync(t, writer, tests[0]+"\n")
+ for count, pos := 0, int64(0); rStream.Advance(); count++ {
+ entry := rStream.Value()
+ if entry.Position != pos {
+ t.Errorf("unexpected position. Got %v, want %v", entry.Position, pos)
+ }
+ if expected := tests[count]; entry.Line != expected {
+ t.Errorf("unexpected content. Got %q, want %q", entry.Line, expected)
+ }
+ pos += int64(len(entry.Line)) + 1
+ if count+1 < len(tests) {
+ writeAndSync(t, writer, tests[count+1]+"\n")
+ }
+ }
+
+ if err := rStream.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", rStream.Err())
+ }
+ _, err = stream.Finish()
+ if err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+}