devtools/madb: prefix console messages with device ids

This CL implements prefixer, an io.Writer wrapper that internally
buffers the output and adds the given prefix to each line of the
output.

madb exec command now uses this prefixer to distinguish console
messages coming from different devices.

Change-Id: I2ac3302bdadd9a781b80a94ebf3fa20a5e2cac62
diff --git a/exec.go b/exec.go
index 653829b..3866328 100644
--- a/exec.go
+++ b/exec.go
@@ -6,6 +6,7 @@
 
 import (
 	"os"
+	"sync"
 
 	"v.io/x/lib/cmdline"
 	"v.io/x/lib/gosh"
@@ -33,6 +34,8 @@
 }
 
 func runMadbExec(env *cmdline.Env, args []string) error {
+	// TODO(youngseokyoon): consider making this function generic
+
 	if err := startAdbServer(); err != nil {
 		return err
 	}
@@ -42,23 +45,35 @@
 		return err
 	}
 
+	wg := sync.WaitGroup{}
+
 	for _, device := range devices {
-		sh := gosh.NewShell(gosh.Opts{})
-		defer sh.Cleanup()
+		// capture the current value
+		deviceCopy := device
 
-		cmdArgs := append([]string{"-s", device}, args...)
-		cmd := sh.Cmd("adb", cmdArgs...)
-
-		// TODO(youngseokyoon): use pipes instead to prefix console messages with their device names.
-		// For now, just forward all the messages to stdout/stderr.
-		cmd.AddStdoutWriter(gosh.NopWriteCloser(os.Stdout))
-		cmd.AddStderrWriter(gosh.NopWriteCloser(os.Stderr))
-
-		cmd.Start()
-		defer cmd.Wait()
-
-		// TODO(youngseokyoon): check for exit code of each command
+		wg.Add(1)
+		go func() {
+			// TODO(youngseokyoon): handle the error returned from here.
+			runMadbExecForDevice(env, args, deviceCopy)
+			wg.Done()
+		}()
 	}
 
+	wg.Wait()
+
 	return nil
 }
+
+func runMadbExecForDevice(env *cmdline.Env, args []string, device string) error {
+	sh := gosh.NewShell(gosh.Opts{})
+	defer sh.Cleanup()
+
+	cmdArgs := append([]string{"-s", device}, args...)
+	cmd := sh.Cmd("adb", cmdArgs...)
+
+	cmd.AddStdoutWriter(newPrefixer(device, os.Stdout))
+	cmd.AddStderrWriter(newPrefixer(device, os.Stderr))
+	cmd.Run()
+
+	return sh.Err
+}
diff --git a/prefixer.go b/prefixer.go
new file mode 100644
index 0000000..a67fdf5
--- /dev/null
+++ b/prefixer.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"bytes"
+	"io"
+	"os"
+)
+
+// prefixer is a io.WriteCloser that adds given prefix to each line of the output.
+type prefixer struct {
+	prefix []byte
+	writer io.Writer
+	buffer bytes.Buffer
+}
+
+// newPrefixer returns a new prefixer that uses the given prefix.
+func newPrefixer(prefix string, writer io.Writer) *prefixer {
+	return &prefixer{
+		prefix: []byte("[" + prefix + "]\t"),
+		writer: writer,
+		buffer: bytes.Buffer{},
+	}
+}
+
+func (p *prefixer) Write(b []byte) (int, error) {
+	// write the bytes to the buffer.
+	p.buffer.Write(b)
+
+	// For each line in the unread buffer, add the prefix and write it to the underlying writer.
+	for {
+		idx := bytes.IndexByte(p.buffer.Bytes(), '\n')
+		if idx == -1 {
+			break
+		}
+
+		prefixed := append(p.prefix, p.buffer.Next(idx+1)...)
+		if _, err := p.writer.Write(prefixed); err != nil {
+			return len(b), err
+		}
+	}
+
+	return len(b), nil
+}
+
+// Close flushes the remaining buffer content with prefix, and closes the underlying writer if applicable.
+func (p *prefixer) Close() error {
+	// Flush the remaining buffer content if any.
+	// Add the prefix at the beginning, and a newline character at the end.
+	if p.buffer.Len() > 0 {
+		prefixed := append(p.prefix, p.buffer.Bytes()...)
+		prefixed = append(prefixed, '\n')
+		if _, err := p.writer.Write(prefixed); err != nil {
+			return err
+		}
+	}
+
+	// Close the underlying writer unless the writer is stdout or stderr.
+	if p.writer != os.Stdout && p.writer != os.Stderr {
+		if closer, ok := p.writer.(io.Closer); ok {
+			return closer.Close()
+		}
+	}
+
+	return nil
+}
diff --git a/prefixer_test.go b/prefixer_test.go
new file mode 100644
index 0000000..9a78c86
--- /dev/null
+++ b/prefixer_test.go
@@ -0,0 +1,69 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"bytes"
+	"fmt"
+	"testing"
+)
+
+func TestPrefixerSingleDevice(t *testing.T) {
+	buffer := bytes.Buffer{}
+	prefixer := newPrefixer("deviceid01", &buffer)
+
+	fmt.Fprintln(prefixer, "First line.")
+	fmt.Fprintln(prefixer, "Second line.")
+	prefixer.Close()
+
+	want := `[deviceid01]	First line.
+[deviceid01]	Second line.
+`
+
+	if got := buffer.String(); got != want {
+		t.Fatalf("unmatched results: got %v, want %v", got, want)
+	}
+}
+
+func TestPrefixerTwoDevices(t *testing.T) {
+	// Two devices case
+	buffer := bytes.Buffer{}
+	prefixer1 := newPrefixer("deviceid01", &buffer)
+	prefixer2 := newPrefixer("deviceid02", &buffer)
+
+	fmt.Fprintf(prefixer2, "Second")
+	fmt.Fprintln(prefixer1, "First line.")
+	fmt.Fprintln(prefixer2, " line.")
+	fmt.Fprintln(prefixer1, "Third line.")
+	fmt.Fprintln(prefixer2, "Fourth line.")
+	prefixer1.Close()
+	prefixer2.Close()
+
+	want := `[deviceid01]	First line.
+[deviceid02]	Second line.
+[deviceid01]	Third line.
+[deviceid02]	Fourth line.
+`
+
+	if got := buffer.String(); got != want {
+		t.Fatalf("unmatched results: got %v, want %v", got, want)
+	}
+}
+
+func TestPrefixerLastLine(t *testing.T) {
+	// For the last line, a newline character should be added automatically.
+	buffer := bytes.Buffer{}
+	prefixer := newPrefixer("deviceid01", &buffer)
+
+	fmt.Fprintf(prefixer, "First line.")
+	prefixer.Close()
+
+	want := `[deviceid01]	First line.
+`
+
+	if got := buffer.String(); got != want {
+		t.Fatalf("unmatched results: got %v, want %v", got, want)
+	}
+}