veyron/services/mgmt/logreader/impl: Add glob for logs directory.
This change adds a Globbable implementation to find the list of files in
a logs directory.
There is also some minor refactoring.
Change-Id: Iaded5cfc43d2dc49a19c3160a7ecfecc1cfcf77e
diff --git a/services/mgmt/logreader/impl/common.go b/services/mgmt/logreader/impl/common.go
new file mode 100644
index 0000000..ccc98b9
--- /dev/null
+++ b/services/mgmt/logreader/impl/common.go
@@ -0,0 +1,33 @@
+// Package impl implements the LogFile interface from
+// veyron2/services/mgmt/logreader, which can be used to allow remote access to
+// log files, and the Globbable interface from veyron2/services/mounttable to
+// find the files in a logs directory.
+package impl
+
+import (
+ "path"
+ "strings"
+
+ "veyron2/services/mgmt/logreader"
+ "veyron2/verror"
+)
+
+var (
+ errCanceled = verror.Abortedf("operation canceled")
+ errNotFound = verror.NotFoundf("log file not found")
+ errEOF = verror.Make(logreader.EOF, "EOF")
+ errOperationFailed = verror.Internalf("operation failed")
+)
+
+// translateNameToFilename returns the file name that corresponds to the object
+// name.
+func translateNameToFilename(root, name string) (string, error) {
+ p := path.Join(root, name)
+ // Make sure we're not asked to read a file outside of the root
+ // directory. This could happen if suffix contains "../", which get
+ // collapsed by path.Join().
+ if !strings.HasPrefix(p, root) {
+ return "", errOperationFailed
+ }
+ return p, nil
+}
diff --git a/services/mgmt/logreader/impl/common_test.go b/services/mgmt/logreader/impl/common_test.go
new file mode 100644
index 0000000..991d833
--- /dev/null
+++ b/services/mgmt/logreader/impl/common_test.go
@@ -0,0 +1,31 @@
+package impl_test
+
+import (
+ "testing"
+ "veyron2/ipc"
+ "veyron2/rt"
+)
+
+func startServer(t *testing.T, disp ipc.Dispatcher) (ipc.Server, string, error) {
+ server, err := rt.R().NewServer()
+ if err != nil {
+ t.Fatalf("NewServer failed: %v", err)
+ return nil, "", err
+ }
+ endpoint, err := server.Listen("tcp", "localhost:0")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ return nil, "", err
+ }
+ if err := server.Serve("", disp); err != nil {
+ t.Fatalf("Serve failed: %v", err)
+ return nil, "", err
+ }
+ return server, endpoint.String(), nil
+}
+
+func stopServer(t *testing.T, server ipc.Server) {
+ if err := server.Stop(); err != nil {
+ t.Errorf("server.Stop failed: %v", err)
+ }
+}
diff --git a/services/mgmt/logreader/impl/invoker.go b/services/mgmt/logreader/impl/invoker.go
deleted file mode 100644
index 25f58a7..0000000
--- a/services/mgmt/logreader/impl/invoker.go
+++ /dev/null
@@ -1,111 +0,0 @@
-// Package impl implements the LogFile interface from
-// veyron2/services/mgmt/logreader. It can be used to allow remote access to
-// log files.
-package impl
-
-import (
- "io"
- "math"
- "os"
- "path"
- "strings"
-
- "veyron2/ipc"
- "veyron2/services/mgmt/logreader"
- "veyron2/verror"
- "veyron2/vlog"
-)
-
-// invoker holds the state of a logfile invocation.
-type invoker struct {
- // root is the root directory from which the object names are based.
- root string
- // suffix is the suffix of the current invocation that is assumed to
- // be used as a relative object name to identify a log file.
- suffix string
-}
-
-var (
- errCanceled = verror.Abortedf("operation canceled")
- errNotFound = verror.NotFoundf("log file not found")
- errEOF = verror.Make(logreader.EOF, "EOF")
- errOperationFailed = verror.Internalf("operation failed")
-)
-
-// NewInvoker is the invoker factory.
-func NewInvoker(root, suffix string) *invoker {
- return &invoker{
- root: path.Clean(root),
- suffix: suffix,
- }
-}
-
-// fileName returns the file name that corresponds to the object name.
-func (i *invoker) fileName() (string, error) {
- p := path.Join(i.root, i.suffix)
- // Make sure we're not asked to read a file outside of the root
- // directory. This could happen if suffix contains "../", which get
- // collapsed by path.Join().
- if !strings.HasPrefix(p, i.root) {
- return "", errOperationFailed
- }
- return p, nil
-}
-
-// Size returns the size of the log file, in bytes.
-func (i *invoker) Size(context ipc.ServerContext) (int64, error) {
- vlog.VI(1).Infof("%v.Size()", i.suffix)
- fname, err := i.fileName()
- if err != nil {
- return 0, err
- }
- fi, err := os.Stat(fname)
- if err != nil {
- if os.IsNotExist(err) {
- return 0, errNotFound
- }
- vlog.Errorf("Stat(%v) failed: %v", fname, err)
- return 0, errOperationFailed
- }
- if fi.IsDir() {
- return 0, errOperationFailed
- }
- return fi.Size(), nil
-}
-
-// ReadLog returns log entries from the log file.
-func (i *invoker) ReadLog(context ipc.ServerContext, startpos int64, numEntries int32, follow bool, stream logreader.LogFileServiceReadLogStream) (int64, error) {
- vlog.VI(1).Infof("%v.ReadLog(%v, %v, %v)", i.suffix, startpos, numEntries, follow)
- fname, err := i.fileName()
- if err != nil {
- return 0, err
- }
- f, err := os.Open(fname)
- if err != nil {
- if os.IsNotExist(err) {
- return 0, errNotFound
- }
- return 0, errOperationFailed
- }
- reader := newFollowReader(context, f, startpos, follow)
- if numEntries == logreader.AllEntries {
- numEntries = int32(math.MaxInt32)
- }
- sender := stream.SendStream()
- for n := int32(0); n < numEntries; n++ {
- line, offset, err := reader.readLine()
- if err == io.EOF && n > 0 {
- return reader.tell(), nil
- }
- if err == io.EOF {
- return reader.tell(), errEOF
- }
- if err != nil {
- return reader.tell(), errOperationFailed
- }
- if err := sender.Send(logreader.LogEntry{Position: offset, Line: line}); err != nil {
- return reader.tell(), err
- }
- }
- return reader.tell(), nil
-}
diff --git a/services/mgmt/logreader/impl/logdir_invoker.go b/services/mgmt/logreader/impl/logdir_invoker.go
new file mode 100644
index 0000000..de226cc
--- /dev/null
+++ b/services/mgmt/logreader/impl/logdir_invoker.go
@@ -0,0 +1,81 @@
+package impl
+
+import (
+ "os"
+ "path"
+
+ "veyron/lib/glob"
+
+ "veyron2/ipc"
+ "veyron2/services/mounttable"
+ "veyron2/vlog"
+)
+
+// logDirectoryInvoker holds the state of an invocation.
+type logDirectoryInvoker struct {
+ // root is the root directory from which the object names are based.
+ root string
+ // suffix is the suffix of the current invocation that is assumed to
+ // be used as a relative object name to identify a log directory.
+ suffix string
+}
+
+// NewLogDirectoryInvoker is the invoker factory.
+func NewLogDirectoryInvoker(root, suffix string) *logDirectoryInvoker {
+ return &logDirectoryInvoker{
+ root: path.Clean(root),
+ suffix: suffix,
+ }
+}
+
+// Glob streams the name of all the objects that match pattern.
+func (i *logDirectoryInvoker) Glob(context ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
+ vlog.VI(1).Infof("%v.Glob(%v)", i.suffix, pattern)
+ g, err := glob.Parse(pattern)
+ if err != nil {
+ return err
+ }
+ i.root = path.Join(i.root, i.suffix)
+ return i.globStep("", g, true, stream)
+}
+
+// globStep applies a glob recursively.
+func (i *logDirectoryInvoker) globStep(name string, g *glob.Glob, isDir bool, stream mounttable.GlobbableServiceGlobStream) error {
+ if g.Len() == 0 && !isDir {
+ if err := stream.SendStream().Send(mounttable.MountEntry{Name: name}); err != nil {
+ return err
+ }
+ }
+ if g.Finished() || !isDir {
+ return nil
+ }
+ dirName, err := translateNameToFilename(i.root, name)
+ if err != nil {
+ return err
+ }
+ f, err := os.Open(dirName)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return errNotFound
+ }
+ return errOperationFailed
+ }
+ fi, err := f.Readdir(0)
+ if err != nil {
+ return errOperationFailed
+ }
+ f.Close()
+ for _, file := range fi {
+ fileName := file.Name()
+ if fileName == "." || fileName == ".." {
+ continue
+ }
+ if ok, left := g.MatchInitialSegment(fileName); ok {
+ if err := i.globStep(path.Join(name, fileName), left, file.IsDir(), stream); err != nil {
+ return err
+ }
+ }
+
+ }
+ return nil
+}
diff --git a/services/mgmt/logreader/impl/logdir_invoker_test.go b/services/mgmt/logreader/impl/logdir_invoker_test.go
new file mode 100644
index 0000000..2375a43
--- /dev/null
+++ b/services/mgmt/logreader/impl/logdir_invoker_test.go
@@ -0,0 +1,180 @@
+package impl_test
+
+import (
+ "io/ioutil"
+ "os"
+ "path"
+ "reflect"
+ "sort"
+ "testing"
+
+ "veyron/services/mgmt/logreader/impl"
+
+ "veyron2/ipc"
+ "veyron2/naming"
+ "veyron2/rt"
+ "veyron2/security"
+ "veyron2/services/mounttable"
+ "veyron2/verror"
+)
+
+type logDirectoryDispatcher struct {
+ root string
+}
+
+func (d *logDirectoryDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ invoker := ipc.ReflectInvoker(mounttable.NewServerGlobbable(impl.NewLogDirectoryInvoker(d.root, suffix)))
+ return invoker, nil, nil
+}
+
+func TestLogDirectory(t *testing.T) {
+ runtime := rt.Init()
+
+ workdir, err := ioutil.TempDir("", "logreadertest")
+ if err != nil {
+ t.Fatalf("ioutil.TempDir: %v", err)
+ }
+ defer os.RemoveAll(workdir)
+ server, endpoint, err := startServer(t, &logDirectoryDispatcher{workdir})
+ if err != nil {
+ t.Fatalf("startServer failed: %v", err)
+ }
+ defer stopServer(t, server)
+
+ if err := os.Mkdir(path.Join(workdir, "subdir"), os.FileMode(0755)); err != nil {
+ t.Fatalf("os.Mkdir failed: %v", err)
+ }
+ tests := []string{
+ "foo.txt",
+ "bar.txt",
+ "subdir/foo2.txt",
+ "subdir/bar2.txt",
+ }
+ for _, s := range tests {
+ if err := ioutil.WriteFile(path.Join(workdir, s), []byte(s), os.FileMode(0644)); err != nil {
+ t.Fatalf("ioutil.WriteFile failed: %v", err)
+ }
+ }
+
+ // Try to access a directory that doesn't exist.
+ ld, err := mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//doesntexist"))
+ if err != nil {
+ t.Errorf("BindLogDirectory: %v", err)
+ }
+ stream, err := ld.Glob(runtime.NewContext(), "*")
+ if err != nil {
+ t.Errorf("unexpected error, got %v, want: nil", err)
+ }
+ if err := stream.Finish(); err != nil {
+ if expected := verror.NotFound; !verror.Is(err, expected) {
+ t.Errorf("unexpected error value, got %v, want: %v", err, expected)
+ }
+ }
+
+ // Try to access a directory that does exist.
+ ld, err = mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//"))
+ if err != nil {
+ t.Errorf("BindLogDirectory: %v", err)
+ }
+
+ stream, err = ld.Glob(runtime.NewContext(), "...")
+ if err != nil {
+ t.Errorf("Glob failed: %v", err)
+ }
+ results := []string{}
+ iterator := stream.RecvStream()
+ for count := 0; iterator.Advance(); count++ {
+ results = append(results, iterator.Value().Name)
+ }
+ sort.Strings(tests)
+ sort.Strings(results)
+ if !reflect.DeepEqual(tests, results) {
+ t.Errorf("unexpected result. Got %v, want %v", results, tests)
+ }
+
+ if err := iterator.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", iterator.Err())
+ }
+ if err := stream.Finish(); err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+
+ stream, err = ld.Glob(runtime.NewContext(), "*")
+ if err != nil {
+ t.Errorf("Glob failed: %v", err)
+ }
+ results = []string{}
+ iterator = stream.RecvStream()
+ for count := 0; iterator.Advance(); count++ {
+ results = append(results, iterator.Value().Name)
+ }
+ sort.Strings(results)
+ expected := []string{
+ "bar.txt",
+ "foo.txt",
+ }
+ if !reflect.DeepEqual(expected, results) {
+ t.Errorf("unexpected result. Got %v, want %v", results, expected)
+ }
+
+ if err := iterator.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", iterator.Err())
+ }
+ if err := stream.Finish(); err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+
+ stream, err = ld.Glob(runtime.NewContext(), "subdir/*")
+ if err != nil {
+ t.Errorf("Glob failed: %v", err)
+ }
+ results = []string{}
+ iterator = stream.RecvStream()
+ for count := 0; iterator.Advance(); count++ {
+ results = append(results, iterator.Value().Name)
+ }
+ sort.Strings(results)
+ expected = []string{
+ "subdir/bar2.txt",
+ "subdir/foo2.txt",
+ }
+ if !reflect.DeepEqual(expected, results) {
+ t.Errorf("unexpected result. Got %v, want %v", results, expected)
+ }
+
+ if err := iterator.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", iterator.Err())
+ }
+ if err := stream.Finish(); err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+
+ ld, err = mounttable.BindGlobbable(naming.JoinAddressName(endpoint, "//subdir"))
+ if err != nil {
+ t.Errorf("BindLogDirectory: %v", err)
+ }
+ stream, err = ld.Glob(runtime.NewContext(), "*")
+ if err != nil {
+ t.Errorf("Glob failed: %v", err)
+ }
+ results = []string{}
+ iterator = stream.RecvStream()
+ for count := 0; iterator.Advance(); count++ {
+ results = append(results, iterator.Value().Name)
+ }
+ sort.Strings(results)
+ expected = []string{
+ "bar2.txt",
+ "foo2.txt",
+ }
+ if !reflect.DeepEqual(expected, results) {
+ t.Errorf("unexpected result. Got %v, want %v", results, expected)
+ }
+
+ if err := iterator.Err(); err != nil {
+ t.Errorf("unexpected stream error: %v", iterator.Err())
+ }
+ if err := stream.Finish(); err != nil {
+ t.Errorf("Finish failed: %v", err)
+ }
+}
diff --git a/services/mgmt/logreader/impl/logfile_invoker.go b/services/mgmt/logreader/impl/logfile_invoker.go
new file mode 100644
index 0000000..74ca226
--- /dev/null
+++ b/services/mgmt/logreader/impl/logfile_invoker.go
@@ -0,0 +1,87 @@
+package impl
+
+import (
+ "io"
+ "math"
+ "os"
+ "path"
+
+ "veyron2/ipc"
+ "veyron2/services/mgmt/logreader"
+ "veyron2/vlog"
+)
+
+// logFileInvoker holds the state of a logfile invocation.
+type logFileInvoker struct {
+ // root is the root directory from which the object names are based.
+ root string
+ // suffix is the suffix of the current invocation that is assumed to
+ // be used as a relative object name to identify a log file.
+ suffix string
+}
+
+// NewLogFileInvoker is the invoker factory.
+func NewLogFileInvoker(root, suffix string) *logFileInvoker {
+ return &logFileInvoker{
+ root: path.Clean(root),
+ suffix: suffix,
+ }
+}
+
+// Size returns the size of the log file, in bytes.
+func (i *logFileInvoker) Size(context ipc.ServerContext) (int64, error) {
+ vlog.VI(1).Infof("%v.Size()", i.suffix)
+ fname, err := translateNameToFilename(i.root, i.suffix)
+ if err != nil {
+ return 0, err
+ }
+ fi, err := os.Stat(fname)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return 0, errNotFound
+ }
+ vlog.Errorf("Stat(%v) failed: %v", fname, err)
+ return 0, errOperationFailed
+ }
+ if fi.IsDir() {
+ return 0, errOperationFailed
+ }
+ return fi.Size(), nil
+}
+
+// ReadLog returns log entries from the log file.
+func (i *logFileInvoker) ReadLog(context ipc.ServerContext, startpos int64, numEntries int32, follow bool, stream logreader.LogFileServiceReadLogStream) (int64, error) {
+ vlog.VI(1).Infof("%v.ReadLog(%v, %v, %v)", i.suffix, startpos, numEntries, follow)
+ fname, err := translateNameToFilename(i.root, i.suffix)
+ if err != nil {
+ return 0, err
+ }
+ f, err := os.Open(fname)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return 0, errNotFound
+ }
+ return 0, errOperationFailed
+ }
+ reader := newFollowReader(context, f, startpos, follow)
+ if numEntries == logreader.AllEntries {
+ numEntries = int32(math.MaxInt32)
+ }
+ sender := stream.SendStream()
+ for n := int32(0); n < numEntries; n++ {
+ line, offset, err := reader.readLine()
+ if err == io.EOF && n > 0 {
+ return reader.tell(), nil
+ }
+ if err == io.EOF {
+ return reader.tell(), errEOF
+ }
+ if err != nil {
+ return reader.tell(), errOperationFailed
+ }
+ if err := sender.Send(logreader.LogEntry{Position: offset, Line: line}); err != nil {
+ return reader.tell(), err
+ }
+ }
+ return reader.tell(), nil
+}
diff --git a/services/mgmt/logreader/impl/invoker_test.go b/services/mgmt/logreader/impl/logfile_invoker_test.go
similarity index 84%
rename from services/mgmt/logreader/impl/invoker_test.go
rename to services/mgmt/logreader/impl/logfile_invoker_test.go
index ffa19f9..2b79268 100644
--- a/services/mgmt/logreader/impl/invoker_test.go
+++ b/services/mgmt/logreader/impl/logfile_invoker_test.go
@@ -16,40 +16,15 @@
"veyron2/verror"
)
-type dispatcher struct {
+type logFileDispatcher struct {
root string
}
-func (d *dispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
- invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewInvoker(d.root, suffix)))
+func (d *logFileDispatcher) Lookup(suffix string) (ipc.Invoker, security.Authorizer, error) {
+ invoker := ipc.ReflectInvoker(logreader.NewServerLogFile(impl.NewLogFileInvoker(d.root, suffix)))
return invoker, nil, nil
}
-func startServer(t *testing.T, root string) (ipc.Server, string, error) {
- disp := &dispatcher{root}
- server, err := rt.R().NewServer()
- if err != nil {
- t.Fatalf("NewServer failed: %v", err)
- return nil, "", err
- }
- endpoint, err := server.Listen("tcp", "localhost:0")
- if err != nil {
- t.Fatalf("Listen failed: %v", err)
- return nil, "", err
- }
- if err := server.Serve("", disp); err != nil {
- t.Fatalf("Serve failed: %v", err)
- return nil, "", err
- }
- return server, endpoint.String(), nil
-}
-
-func stopServer(t *testing.T, server ipc.Server) {
- if err := server.Stop(); err != nil {
- t.Errorf("server.Stop failed: %v", err)
- }
-}
-
func writeAndSync(t *testing.T, w *os.File, s string) {
if _, err := w.WriteString(s); err != nil {
t.Fatalf("w.WriteString failed: %v", err)
@@ -67,7 +42,7 @@
t.Fatalf("ioutil.TempDir: %v", err)
}
defer os.RemoveAll(workdir)
- server, endpoint, err := startServer(t, workdir)
+ server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
if err != nil {
t.Fatalf("startServer failed: %v", err)
}
@@ -159,7 +134,7 @@
t.Fatalf("ioutil.TempDir: %v", err)
}
defer os.RemoveAll(workdir)
- server, endpoint, err := startServer(t, workdir)
+ server, endpoint, err := startServer(t, &logFileDispatcher{workdir})
if err != nil {
t.Fatalf("startServer failed: %v", err)
}