veyron/services/mgmt/logreader/impl: Add glob for logs directory.

This change adds a Globbable implementation to find the list of files in
a logs directory.

There is also some minor refactoring.

Change-Id: Iaded5cfc43d2dc49a19c3160a7ecfecc1cfcf77e
diff --git a/services/mgmt/logreader/impl/common.go b/services/mgmt/logreader/impl/common.go
new file mode 100644
index 0000000..ccc98b9
--- /dev/null
+++ b/services/mgmt/logreader/impl/common.go
@@ -0,0 +1,33 @@
+// Package impl implements the LogFile interface from
+// veyron2/services/mgmt/logreader, which can be used to allow remote access to
+// log files, and the Globbable interface from veyron2/services/mounttable to
+// find the files in a logs directory.
+package impl
+
+import (
+	"path"
+	"strings"
+
+	"veyron2/services/mgmt/logreader"
+	"veyron2/verror"
+)
+
+var (
+	errCanceled        = verror.Abortedf("operation canceled")
+	errNotFound        = verror.NotFoundf("log file not found")
+	errEOF             = verror.Make(logreader.EOF, "EOF")
+	errOperationFailed = verror.Internalf("operation failed")
+)
+
+// translateNameToFilename returns the file name that corresponds to the object
+// name.
+func translateNameToFilename(root, name string) (string, error) {
+	p := path.Join(root, name)
+	// Make sure we're not asked to read a file outside of the root
+	// directory. This could happen if suffix contains "../", which get
+	// collapsed by path.Join().
+	if !strings.HasPrefix(p, root) {
+		return "", errOperationFailed
+	}
+	return p, nil
+}
diff --git a/services/mgmt/logreader/impl/common_test.go b/services/mgmt/logreader/impl/common_test.go
new file mode 100644
index 0000000..991d833
--- /dev/null
+++ b/services/mgmt/logreader/impl/common_test.go
@@ -0,0 +1,31 @@
+package impl_test
+
+import (
+	"testing"
+	"veyron2/ipc"
+	"veyron2/rt"
+)
+
+func startServer(t *testing.T, disp ipc.Dispatcher) (ipc.Server, string, error) {
+	server, err := rt.R().NewServer()
+	if err != nil {
+		t.Fatalf("NewServer failed: %v", err)
+		return nil, "", err
+	}
+	endpoint, err := server.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("Listen failed: %v", err)
+		return nil, "", err
+	}
+	if err := server.Serve("", disp); err != nil {
+		t.Fatalf("Serve failed: %v", err)
+		return nil, "", err
+	}
+	return server, endpoint.String(), nil
+}
+
+func stopServer(t *testing.T, server ipc.Server) {
+	if err := server.Stop(); err != nil {
+		t.Errorf("server.Stop failed: %v", err)
+	}
+}
diff --git a/services/mgmt/logreader/impl/invoker.go b/services/mgmt/logreader/impl/invoker.go
deleted file mode 100644
index 25f58a7..0000000
--- a/services/mgmt/logreader/impl/invoker.go
+++ /dev/null
@@ -1,111 +0,0 @@
-// Package impl implements the LogFile interface from
-// veyron2/services/mgmt/logreader. It can be used to allow remote access to
-// log files.
-package impl
-
-import (
-	"io"
-	"math"
-	"os"
-	"path"
-	"strings"
-
-	"veyron2/ipc"
-	"veyron2/services/mgmt/logreader"
-	"veyron2/verror"
-	"veyron2/vlog"
-)
-
-// invoker holds the state of a logfile invocation.
-type invoker struct {
-	// root is the root directory from which the object names are based.
-	root string
-	// suffix is the suffix of the current invocation that is assumed to
-	// be used as a relative object name to identify a log file.
-	suffix string
-}
-
-var (
-	errCanceled        = verror.Abortedf("operation canceled")
-	errNotFound        = verror.NotFoundf("log file not found")
-	errEOF             = verror.Make(logreader.EOF, "EOF")
-	errOperationFailed = verror.Internalf("operation failed")
-)
-
-// NewInvoker is the invoker factory.
-func NewInvoker(root, suffix string) *invoker {
-	return &invoker{
-		root:   path.Clean(root),
-		suffix: suffix,
-	}
-}
-
-// fileName returns the file name that corresponds to the object name.
-func (i *invoker) fileName() (string, error) {
-	p := path.Join(i.root, i.suffix)
-	// Make sure we're not asked to read a file outside of the root
-	// directory. This could happen if suffix contains "../", which get
-	// collapsed by path.Join().
-	if !strings.HasPrefix(p, i.root) {
-		return "", errOperationFailed
-	}
-	return p, nil
-}
-
-// Size returns the size of the log file, in bytes.
-func (i *invoker) Size(context ipc.ServerContext) (int64, error) {
-	vlog.VI(1).Infof("%v.Size()", i.suffix)
-	fname, err := i.fileName()
-	if err != nil {
-		return 0, err
-	}
-	fi, err := os.Stat(fname)
-	if err != nil {
-		if os.IsNotExist(err) {
-			return 0, errNotFound
-		}
-		vlog.Errorf("Stat(%v) failed: %v", fname, err)
-		return 0, errOperationFailed
-	}
-	if fi.IsDir() {
-		return 0, errOperationFailed
-	}
-	return fi.Size(), nil
-}
-
-// ReadLog returns log entries from the log file.
-func (i *invoker) ReadLog(context ipc.ServerContext, startpos int64, numEntries int32, follow bool, stream logreader.LogFileServiceReadLogStream) (int64, error) {
-	vlog.VI(1).Infof("%v.ReadLog(%v, %v, %v)", i.suffix, startpos, numEntries, follow)
-	fname, err := i.fileName()
-	if err != nil {
-		return 0, err
-	}
-	f, err := os.Open(fname)
-	if err != nil {
-		if os.IsNotExist(err) {
-			return 0, errNotFound
-		}
-		return 0, errOperationFailed
-	}
-	reader := newFollowReader(context, f, startpos, follow)
-	if numEntries == logreader.AllEntries {
-		numEntries = int32(math.MaxInt32)
-	}
-	sender := stream.SendStream()
-	for n := int32(0); n < numEntries; n++ {
-		line, offset, err := reader.readLine()
-		if err == io.EOF && n > 0 {
-			return reader.tell(), nil
-		}
-		if err == io.EOF {
-			return reader.tell(), errEOF
-		}
-		if err != nil {
-			return reader.tell(), errOperationFailed
-		}
-		if err := sender.Send(logreader.LogEntry{Position: offset, Line: line}); err != nil {
-			return reader.tell(), err
-		}
-	}
-	return reader.tell(), nil
-}
diff --git a/services/mgmt/logreader/impl/logdir_invoker.go b/services/mgmt/logreader/impl/logdir_invoker.go
new file mode 100644
index 0000000..de226cc
--- /dev/null
+++ b/services/mgmt/logreader/impl/logdir_invoker.go
@@ -0,0 +1,81 @@
+package impl
+
+import (
+	"os"
+	"path"
+
+	"veyron/lib/glob"
+
+	"veyron2/ipc"
+	"veyron2/services/mounttable"
+	"veyron2/vlog"
+)
+
+// logDirectoryInvoker holds the state of an invocation.
+type logDirectoryInvoker struct {
+	// root is the root directory from which the object names are based.
+	root string
+	// suffix is the suffix of the current invocation that is assumed to
+	// be used as a relative object name to identify a log directory.
+	suffix string
+}
+
+// NewLogDirectoryInvoker is the invoker factory.
+func NewLogDirectoryInvoker(root, suffix string) *logDirectoryInvoker {
+	return &logDirectoryInvoker{
+		root:   path.Clean(root),
+		suffix: suffix,
+	}
+}
+
+// Glob streams the name of all the objects that match pattern.
+func (i *logDirectoryInvoker) Glob(context ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
+	vlog.VI(1).Infof("%v.Glob(%v)", i.suffix, pattern)
+	g, err := glob.Parse(pattern)
+	if err != nil {
+		return err
+	}
+	i.root = path.Join(i.root, i.suffix)
+	return i.globStep("", g, true, stream)
+}
+
+// globStep applies a glob recursively.
+func (i *logDirectoryInvoker) globStep(name string, g *glob.Glob, isDir bool, stream mounttable.GlobbableServiceGlobStream) error {
+	if g.Len() == 0 && !isDir {
+		if err := stream.SendStream().Send(mounttable.MountEntry{Name: name}); err != nil {
+			return err
+		}
+	}
+	if g.Finished() || !isDir {
+		return nil
+	}
+	dirName, err := translateNameToFilename(i.root, name)
+	if err != nil {
+		return err
+	}
+	f, err := os.Open(dirName)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return errNotFound
+		}
+		return errOperationFailed
+	}
+	fi, err := f.Readdir(0)
+	if err != nil {
+		return errOperationFailed
+	}
+	f.Close()
+	for _, file := range fi {
+		fileName := file.Name()
+		if fileName == "." || fileName == ".." {
+			continue
+		}
+		if ok, left := g.MatchInitialSegment(fileName); ok {
+			if err := i.globStep(path.Join(name, fileName), left, file.IsDir(), stream); err != nil {
+				return err
+			}
+		}
+
+	}
+	return nil
+}
diff --git a/services/mgmt/logreader/impl/logdir_invoker_test.go b/services/mgmt/logreader/impl/logdir_invoker_test.go
new file mode 100644
index 0000000..2375a43
--- /dev/null
+++ b/services/mgmt/logreader/impl/logdir_invoker_test.go
@@ -0,0 +1,180 @@
+package impl_test
+
+import (
+	"io/ioutil"
+	"os"
+	"path"
+	"reflect"
+	"sort"
+	"testing"
+
+	"veyron/services/mgmt/logreader/impl"
+
+	"veyron2/ipc"
+	"veyron2/naming"
+	"veyron2/rt"
+	"veyron2/security"
+	"veyron2/services/mounttable"
+	"veyron2/verror"
+)
+
+type logDirectoryDispatcher struct {
+	root string
+}
+
+func (d *logDirectoryDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+	invoker := ipc.ReflectInvoker(mounttable.NewServerGlobbable(impl.NewLogDirectoryInvoker(d.root, suffix)))
+	return invoker, nil, nil
+}
+
+func TestLogDirectory(t *testing.T) {
+	runtime := rt.Init()
+
+	workdir, err := ioutil.TempDir("", "logreadertest")
+	if err != nil {
+		t.Fatalf("ioutil.TempDir: %v", err)
+	}
+	defer os.RemoveAll(workdir)
+	server, endpoint, err := startServer(t, &logDirectoryDispatcher{workdir})
+	if err != nil {
+		t.Fatalf("startServer failed: %v", err)
+	}
+	defer stopServer(t, server)
+
+	if err := os.Mkdir(path.Join(workdir, "subdir"), os.FileMode(0755)); err != nil {
+		t.Fatalf("os.Mkdir failed: %v", err)
+	}
+	tests := []string{
+		"foo.txt",
+		"bar.txt",
+		"subdir/foo2.txt",
+		"subdir/bar2.txt",
+	}
+	for _, s := range tests {
+		if err := ioutil.WriteFile(path.Join(workdir, s), []byte(s), os.FileMode(0644)); err != nil {
+			t.Fatalf("ioutil.WriteFile failed: %v", err)
+		}
+	}
+
+	// Try to access a directory that doesn't exist.
+	ld, err := mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//doesntexist"))
+	if err != nil {
+		t.Errorf("BindLogDirectory: %v", err)
+	}
+	stream, err := ld.Glob(runtime.NewContext(), "*")
+	if err != nil {
+		t.Errorf("unexpected error, got %v, want: nil", err)
+	}
+	if err := stream.Finish(); err != nil {
+		if expected := verror.NotFound; !verror.Is(err, expected) {
+			t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+		}
+	}
+
+	// Try to access a directory that does exist.
+	ld, err = mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//"))
+	if err != nil {
+		t.Errorf("BindLogDirectory: %v", err)
+	}
+
+	stream, err = ld.Glob(runtime.NewContext(), "...")
+	if err != nil {
+		t.Errorf("Glob failed: %v", err)
+	}
+	results := []string{}
+	iterator := stream.RecvStream()
+	for count := 0; iterator.Advance(); count++ {
+		results = append(results, iterator.Value().Name)
+	}
+	sort.Strings(tests)
+	sort.Strings(results)
+	if !reflect.DeepEqual(tests, results) {
+		t.Errorf("unexpected result. Got %v, want %v", results, tests)
+	}
+
+	if err := iterator.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", iterator.Err())
+	}
+	if err := stream.Finish(); err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+
+	stream, err = ld.Glob(runtime.NewContext(), "*")
+	if err != nil {
+		t.Errorf("Glob failed: %v", err)
+	}
+	results = []string{}
+	iterator = stream.RecvStream()
+	for count := 0; iterator.Advance(); count++ {
+		results = append(results, iterator.Value().Name)
+	}
+	sort.Strings(results)
+	expected := []string{
+		"bar.txt",
+		"foo.txt",
+	}
+	if !reflect.DeepEqual(expected, results) {
+		t.Errorf("unexpected result. Got %v, want %v", results, expected)
+	}
+
+	if err := iterator.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", iterator.Err())
+	}
+	if err := stream.Finish(); err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+
+	stream, err = ld.Glob(runtime.NewContext(), "subdir/*")
+	if err != nil {
+		t.Errorf("Glob failed: %v", err)
+	}
+	results = []string{}
+	iterator = stream.RecvStream()
+	for count := 0; iterator.Advance(); count++ {
+		results = append(results, iterator.Value().Name)
+	}
+	sort.Strings(results)
+	expected = []string{
+		"subdir/bar2.txt",
+		"subdir/foo2.txt",
+	}
+	if !reflect.DeepEqual(expected, results) {
+		t.Errorf("unexpected result. Got %v, want %v", results, expected)
+	}
+
+	if err := iterator.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", iterator.Err())
+	}
+	if err := stream.Finish(); err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+
+	ld, err = mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//subdir"))
+	if err != nil {
+		t.Errorf("BindLogDirectory: %v", err)
+	}
+	stream, err = ld.Glob(runtime.NewContext(), "*")
+	if err != nil {
+		t.Errorf("Glob failed: %v", err)
+	}
+	results = []string{}
+	iterator = stream.RecvStream()
+	for count := 0; iterator.Advance(); count++ {
+		results = append(results, iterator.Value().Name)
+	}
+	sort.Strings(results)
+	expected = []string{
+		"bar2.txt",
+		"foo2.txt",
+	}
+	if !reflect.DeepEqual(expected, results) {
+		t.Errorf("unexpected result. Got %v, want %v", results, expected)
+	}
+
+	if err := iterator.Err(); err != nil {
+		t.Errorf("unexpected stream error: %v", iterator.Err())
+	}
+	if err := stream.Finish(); err != nil {
+		t.Errorf("Finish failed: %v", err)
+	}
+}
diff --git a/services/mgmt/logreader/impl/logfile_invoker.go b/services/mgmt/logreader/impl/logfile_invoker.go
new file mode 100644
index 0000000..74ca226
--- /dev/null
+++ b/services/mgmt/logreader/impl/logfile_invoker.go
@@ -0,0 +1,87 @@
+package impl
+
+import (
+	"io"
+	"math"
+	"os"
+	"path"
+
+	"veyron2/ipc"
+	"veyron2/services/mgmt/logreader"
+	"veyron2/vlog"
+)
+
+// logFileInvoker holds the state of a logfile invocation.
+type logFileInvoker struct {
+	// root is the root directory from which the object names are based.
+	root string
+	// suffix is the suffix of the current invocation that is assumed to
+	// be used as a relative object name to identify a log file.
+	suffix string
+}
+
+// NewLogFileInvoker is the invoker factory.
+func NewLogFileInvoker(root, suffix string) *logFileInvoker {
+	return &logFileInvoker{
+		root:   path.Clean(root),
+		suffix: suffix,
+	}
+}
+
+// Size returns the size of the log file, in bytes.
+func (i *logFileInvoker) Size(context ipc.ServerContext) (int64, error) {
+	vlog.VI(1).Infof("%v.Size()", i.suffix)
+	fname, err := translateNameToFilename(i.root, i.suffix)
+	if err != nil {
+		return 0, err
+	}
+	fi, err := os.Stat(fname)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return 0, errNotFound
+		}
+		vlog.Errorf("Stat(%v) failed: %v", fname, err)
+		return 0, errOperationFailed
+	}
+	if fi.IsDir() {
+		return 0, errOperationFailed
+	}
+	return fi.Size(), nil
+}
+
+// ReadLog returns log entries from the log file.
+func (i *logFileInvoker) ReadLog(context ipc.ServerContext, startpos int64, numEntries int32, follow bool, stream logreader.LogFileServiceReadLogStream) (int64, error) {
+	vlog.VI(1).Infof("%v.ReadLog(%v, %v, %v)", i.suffix, startpos, numEntries, follow)
+	fname, err := translateNameToFilename(i.root, i.suffix)
+	if err != nil {
+		return 0, err
+	}
+	f, err := os.Open(fname)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return 0, errNotFound
+		}
+		return 0, errOperationFailed
+	}
+	reader := newFollowReader(context, f, startpos, follow)
+	if numEntries == logreader.AllEntries {
+		numEntries = int32(math.MaxInt32)
+	}
+	sender := stream.SendStream()
+	for n := int32(0); n < numEntries; n++ {
+		line, offset, err := reader.readLine()
+		if err == io.EOF && n > 0 {
+			return reader.tell(), nil
+		}
+		if err == io.EOF {
+			return reader.tell(), errEOF
+		}
+		if err != nil {
+			return reader.tell(), errOperationFailed
+		}
+		if err := sender.Send(logreader.LogEntry{Position: offset, Line: line}); err != nil {
+			return reader.tell(), err
+		}
+	}
+	return reader.tell(), nil
+}
diff --git a/services/mgmt/logreader/impl/invoker_test.go b/services/mgmt/logreader/impl/logfile_invoker_test.go
similarity index 84%
rename from services/mgmt/logreader/impl/invoker_test.go
rename to services/mgmt/logreader/impl/logfile_invoker_test.go
index ffa19f9..2b79268 100644
--- a/services/mgmt/logreader/impl/invoker_test.go
+++ b/services/mgmt/logreader/impl/logfile_invoker_test.go
@@ -16,40 +16,15 @@
 	"veyron2/verror"
 )
 
-type dispatcher struct {
+type logFileDispatcher struct {
 	root string
 }
 
-func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
-	invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewInvoker(d.root, suffix)))
+func (d *logFileDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+	invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewLogFileInvoker(d.root, suffix)))
 	return invoker, nil, nil
 }
 
-func startServer(t *testing.T, root string) (ipc.Server, string, error) {
-	disp := &dispatcher{root}
-	server, err := rt.R().NewServer()
-	if err != nil {
-		t.Fatalf("NewServer failed: %v", err)
-		return nil, "", err
-	}
-	endpoint, err := server.Listen("tcp", "localhost:0")
-	if err != nil {
-		t.Fatalf("Listen failed: %v", err)
-		return nil, "", err
-	}
-	if err := server.Serve("", disp); err != nil {
-		t.Fatalf("Serve failed: %v", err)
-		return nil, "", err
-	}
-	return server, endpoint.String(), nil
-}
-
-func stopServer(t *testing.T, server ipc.Server) {
-	if err := server.Stop(); err != nil {
-		t.Errorf("server.Stop failed: %v", err)
-	}
-}
-
 func writeAndSync(t *testing.T, w *os.File, s string) {
 	if _, err := w.WriteString(s); err != nil {
 		t.Fatalf("w.WriteString failed: %v", err)
@@ -67,7 +42,7 @@
 		t.Fatalf("ioutil.TempDir: %v", err)
 	}
 	defer os.RemoveAll(workdir)
-	server, endpoint, err := startServer(t, workdir)
+	server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
 	if err != nil {
 		t.Fatalf("startServer failed: %v", err)
 	}
@@ -159,7 +134,7 @@
 		t.Fatalf("ioutil.TempDir: %v", err)
 	}
 	defer os.RemoveAll(workdir)
-	server, endpoint, err := startServer(t, workdir)
+	server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
 	if err != nil {
 		t.Fatalf("startServer failed: %v", err)
 	}