Merge "x/ref/services/wspr: Switching WSPR to the new Server API and removing any references to DeprecatedServer"
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0b6f0ea..76a5206 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -267,6 +267,12 @@
 		c.borrowing[msg.ID] = true
 		c.mu.Unlock()
 		c.handler.HandleFlow(f)
+		if err := f.q.put(ctx, msg.Payload); err != nil {
+			return err
+		}
+		if msg.Flags&message.CloseFlag != 0 {
+			f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+		}
 
 	case *message.Release:
 		release := make([]flowcontrol.Release, 0, len(msg.Counters))
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index c2b8d6c..dd6ffc6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -90,20 +90,6 @@
 	sent := 0
 	var left []byte
 	err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
-		if !f.opened {
-			// TODO(mattr): we should be able to send multiple messages
-			// in a single writeMsg call.
-			err := f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
-				ID:              f.id,
-				InitialCounters: defaultBufferSize,
-				BlessingsKey:    f.bkey,
-				DischargeKey:    f.dkey,
-			})
-			if err != nil {
-				return 0, false, err
-			}
-			f.opened = true
-		}
 		size := 0
 		var bufs [][]byte
 		if len(left) > 0 {
@@ -136,7 +122,22 @@
 			d.Flags |= message.DisableEncryptionFlag
 		}
 		sent += size
-		return size, done, f.conn.mp.writeMsg(f.ctx, d)
+
+		var err error
+		if f.opened {
+			err = f.conn.mp.writeMsg(f.ctx, d)
+		} else {
+			err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
+				ID:              f.id,
+				InitialCounters: defaultBufferSize,
+				BlessingsKey:    f.bkey,
+				DischargeKey:    f.dkey,
+				Flags:           d.Flags,
+				Payload:         d.Payload,
+			})
+			f.opened = true
+		}
+		return size, done, err
 	})
 	if alsoClose || err != nil {
 		f.close(f.ctx, err)
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 6d44609..9435354 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -61,8 +61,19 @@
 	if _, err = p.rw.WriteMsg(p.writeBuf); err != nil {
 		return err
 	}
-	if data, ok := m.(*message.Data); ok && (data.Flags&message.DisableEncryptionFlag != 0) {
-		if _, err = p.rw.WriteMsg(data.Payload...); err != nil {
+	var payload [][]byte
+	switch msg := m.(type) {
+	case *message.Data:
+		if msg.Flags&message.DisableEncryptionFlag != 0 {
+			payload = msg.Payload
+		}
+	case *message.OpenFlow:
+		if msg.Flags&message.DisableEncryptionFlag != 0 {
+			payload = msg.Payload
+		}
+	}
+	if payload != nil {
+		if _, err = p.rw.WriteMsg(payload...); err != nil {
 			return err
 		}
 	}
diff --git a/services/debug/debug/debug_v23_test.go b/services/debug/debug/debug_v23_test.go
index c854906..2d0ee0d 100644
--- a/services/debug/debug/debug_v23_test.go
+++ b/services/debug/debug/debug_v23_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"bufio"
+	"encoding/json"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -104,12 +105,34 @@
 		binary.Start("logs", "read", "__debug/logs/"+logName).WaitOrDie(os.Stderr, os.Stderr)
 	}
 
-	got := binary.Start("stats", "read", "__debug/stats/rpc/server/routing-id/*/methods/ReadLog/latency-ms").Output()
+	readlogStatsEndpoint := "__debug/stats/rpc/server/routing-id/*/methods/ReadLog/latency-ms"
+	got := binary.Start("stats", "read", readlogStatsEndpoint).Output()
 
 	want := fmt.Sprintf("Count: %d", runCount)
 	if !strings.Contains(got, want) {
 		i.Fatalf("expected output %q to contain %q, but did not\n", got, want)
 	}
+
+	// Test "-json" format.
+	jsonOutput := binary.Start("stats", "read", "-json", readlogStatsEndpoint).Output()
+	var stats []struct {
+		Name  string
+		Value struct {
+			Count int
+		}
+	}
+	if err := json.Unmarshal([]byte(jsonOutput), &stats); err != nil {
+		i.Fatalf("invalid json output:\n%s", jsonOutput)
+	}
+	if want, got := 1, len(stats); want != got {
+		i.Fatalf("unexpected stats length, want %d, got %d", want, got)
+	}
+	if !strings.HasPrefix(stats[0].Name, "__debug/stats/rpc/server/routing-id") {
+		i.Fatalf("unexpected Name field, want %q, got %q", want, got)
+	}
+	if want, got := runCount, stats[0].Value.Count; want != got {
+		i.Fatalf("unexpected Value.Count field, want %d, got %d", want, got)
+	}
 }
 
 func V23TestStatsWatch(i *v23tests.T) {
diff --git a/services/debug/debug/doc.go b/services/debug/debug/doc.go
index a84d555..9d73f54 100644
--- a/services/debug/debug/doc.go
+++ b/services/debug/debug/doc.go
@@ -144,6 +144,9 @@
 object names.
 
 The debug stats read flags are:
+ -json=false
+   When true, the command will display the raw value of the object in json
+   format.
  -raw=false
    When true, the command will display the raw value of the object.
  -type=false
diff --git a/services/debug/debug/impl.go b/services/debug/debug/impl.go
index 0a584d7..8ee18a6 100644
--- a/services/debug/debug/impl.go
+++ b/services/debug/debug/impl.go
@@ -8,6 +8,7 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"os"
 	"os/exec"
@@ -46,6 +47,7 @@
 	numEntries int
 	startPos   int64
 	raw        bool
+	rawJson    bool
 	showType   bool
 	pprofCmd   string
 )
@@ -61,6 +63,7 @@
 
 	// stats read flags
 	cmdStatsRead.Flags.BoolVar(&raw, "raw", false, "When true, the command will display the raw value of the object.")
+	cmdStatsRead.Flags.BoolVar(&rawJson, "json", false, "When true, the command will display the raw value of the object in json format.")
 	cmdStatsRead.Flags.BoolVar(&showType, "type", false, "When true, the type of the values will be displayed.")
 
 	// stats watch flags
@@ -319,6 +322,7 @@
 	}()
 
 	var lastErr error
+	jsonOutputs := []string{}
 	for {
 		select {
 		case err := <-errors:
@@ -326,9 +330,16 @@
 			fmt.Fprintln(env.Stderr, err)
 		case out, ok := <-output:
 			if !ok {
+				if rawJson {
+					fmt.Fprintf(env.Stdout, "[%s]", strings.Join(jsonOutputs, ","))
+				}
 				return lastErr
 			}
-			fmt.Fprintln(env.Stdout, out)
+			if rawJson {
+				jsonOutputs = append(jsonOutputs, out)
+			} else {
+				fmt.Fprintln(env.Stdout, out)
+			}
 		}
 	}
 }
@@ -347,7 +358,12 @@
 		errors <- fmt.Errorf("%s: %v", name, err)
 		// fv is still valid, so dump it out too.
 	}
-	output <- fmt.Sprintf("%s: %v", name, fv)
+	// Add "name" to the returned json string if "-json" flag is set.
+	if rawJson {
+		output <- strings.Replace(fv, "{", fmt.Sprintf(`{"Name":%q,`, name), 1)
+	} else {
+		output <- fmt.Sprintf("%s: %v", name, fv)
+	}
 }
 
 var cmdStatsWatch = &cmdline.Command{
@@ -441,6 +457,29 @@
 	if showType {
 		ret += value.Type().String() + ": "
 	}
+	if rawJson {
+		var converted interface{}
+		// We will just return raw string if any of the following steps fails.
+		if err := vdl.Convert(&converted, value); err != nil {
+			retErr := fmt.Errorf("couldn't show raw content in json format: %v", err)
+			return ret + value.String(), retErr
+		}
+		result := struct {
+			Type  string
+			Value interface{}
+		}{
+			Value: converted,
+		}
+		if showType {
+			result.Type = value.Type().String()
+		}
+		resultBytes, err := json.Marshal(result)
+		if err != nil {
+			retErr := fmt.Errorf("couldn't show raw content in json format: %v", err)
+			return ret + value.String(), retErr
+		}
+		return string(resultBytes), nil
+	}
 	if raw {
 		return ret + value.String(), nil
 	}