Merge "x/ref/services/wspr: Switching WSPR to the new Server API and removing any references to DeprecatedServer"
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
index 0b6f0ea..76a5206 100644
--- a/runtime/internal/flow/conn/conn.go
+++ b/runtime/internal/flow/conn/conn.go
@@ -267,6 +267,12 @@
c.borrowing[msg.ID] = true
c.mu.Unlock()
c.handler.HandleFlow(f)
+ if err := f.q.put(ctx, msg.Payload); err != nil {
+ return err
+ }
+ if msg.Flags&message.CloseFlag != 0 {
+ f.close(ctx, NewErrFlowClosedRemotely(f.ctx))
+ }
case *message.Release:
release := make([]flowcontrol.Release, 0, len(msg.Counters))
diff --git a/runtime/internal/flow/conn/flow.go b/runtime/internal/flow/conn/flow.go
index c2b8d6c..dd6ffc6 100644
--- a/runtime/internal/flow/conn/flow.go
+++ b/runtime/internal/flow/conn/flow.go
@@ -90,20 +90,6 @@
sent := 0
var left []byte
err := f.worker.Run(f.ctx, func(tokens int) (int, bool, error) {
- if !f.opened {
- // TODO(mattr): we should be able to send multiple messages
- // in a single writeMsg call.
- err := f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
- ID: f.id,
- InitialCounters: defaultBufferSize,
- BlessingsKey: f.bkey,
- DischargeKey: f.dkey,
- })
- if err != nil {
- return 0, false, err
- }
- f.opened = true
- }
size := 0
var bufs [][]byte
if len(left) > 0 {
@@ -136,7 +122,22 @@
d.Flags |= message.DisableEncryptionFlag
}
sent += size
- return size, done, f.conn.mp.writeMsg(f.ctx, d)
+
+ var err error
+ if f.opened {
+ err = f.conn.mp.writeMsg(f.ctx, d)
+ } else {
+ err = f.conn.mp.writeMsg(f.ctx, &message.OpenFlow{
+ ID: f.id,
+ InitialCounters: defaultBufferSize,
+ BlessingsKey: f.bkey,
+ DischargeKey: f.dkey,
+ Flags: d.Flags,
+ Payload: d.Payload,
+ })
+ f.opened = true
+ }
+ return size, done, err
})
if alsoClose || err != nil {
f.close(f.ctx, err)
diff --git a/runtime/internal/flow/conn/message.go b/runtime/internal/flow/conn/message.go
index 6d44609..9435354 100644
--- a/runtime/internal/flow/conn/message.go
+++ b/runtime/internal/flow/conn/message.go
@@ -61,8 +61,19 @@
if _, err = p.rw.WriteMsg(p.writeBuf); err != nil {
return err
}
- if data, ok := m.(*message.Data); ok && (data.Flags&message.DisableEncryptionFlag != 0) {
- if _, err = p.rw.WriteMsg(data.Payload...); err != nil {
+ var payload [][]byte
+ switch msg := m.(type) {
+ case *message.Data:
+ if msg.Flags&message.DisableEncryptionFlag != 0 {
+ payload = msg.Payload
+ }
+ case *message.OpenFlow:
+ if msg.Flags&message.DisableEncryptionFlag != 0 {
+ payload = msg.Payload
+ }
+ }
+ if payload != nil {
+ if _, err = p.rw.WriteMsg(payload...); err != nil {
return err
}
}
diff --git a/services/debug/debug/debug_v23_test.go b/services/debug/debug/debug_v23_test.go
index c854906..2d0ee0d 100644
--- a/services/debug/debug/debug_v23_test.go
+++ b/services/debug/debug/debug_v23_test.go
@@ -6,6 +6,7 @@
import (
"bufio"
+ "encoding/json"
"fmt"
"os"
"path/filepath"
@@ -104,12 +105,34 @@
binary.Start("logs", "read", "__debug/logs/"+logName).WaitOrDie(os.Stderr, os.Stderr)
}
- got := binary.Start("stats", "read", "__debug/stats/rpc/server/routing-id/*/methods/ReadLog/latency-ms").Output()
+ readlogStatsEndpoint := "__debug/stats/rpc/server/routing-id/*/methods/ReadLog/latency-ms"
+ got := binary.Start("stats", "read", readlogStatsEndpoint).Output()
want := fmt.Sprintf("Count: %d", runCount)
if !strings.Contains(got, want) {
i.Fatalf("expected output %q to contain %q, but did not\n", got, want)
}
+
+ // Test "-json" format.
+ jsonOutput := binary.Start("stats", "read", "-json", readlogStatsEndpoint).Output()
+ var stats []struct {
+ Name string
+ Value struct {
+ Count int
+ }
+ }
+ if err := json.Unmarshal([]byte(jsonOutput), &stats); err != nil {
+ i.Fatalf("invalid json output:\n%s", jsonOutput)
+ }
+ if want, got := 1, len(stats); want != got {
+ i.Fatalf("unexpected stats length, want %d, got %d", want, got)
+ }
+ if !strings.HasPrefix(stats[0].Name, "__debug/stats/rpc/server/routing-id") {
+ i.Fatalf("unexpected Name field, want %q, got %q", want, got)
+ }
+ if want, got := runCount, stats[0].Value.Count; want != got {
+ i.Fatalf("unexpected Value.Count field, want %d, got %d", want, got)
+ }
}
func V23TestStatsWatch(i *v23tests.T) {
diff --git a/services/debug/debug/doc.go b/services/debug/debug/doc.go
index a84d555..9d73f54 100644
--- a/services/debug/debug/doc.go
+++ b/services/debug/debug/doc.go
@@ -144,6 +144,9 @@
object names.
The debug stats read flags are:
+ -json=false
+ When true, the command will display the raw value of the object in json
+ format.
-raw=false
When true, the command will display the raw value of the object.
-type=false
diff --git a/services/debug/debug/impl.go b/services/debug/debug/impl.go
index 0a584d7..8ee18a6 100644
--- a/services/debug/debug/impl.go
+++ b/services/debug/debug/impl.go
@@ -8,6 +8,7 @@
package main
import (
+ "encoding/json"
"fmt"
"os"
"os/exec"
@@ -46,6 +47,7 @@
numEntries int
startPos int64
raw bool
+ rawJson bool
showType bool
pprofCmd string
)
@@ -61,6 +63,7 @@
// stats read flags
cmdStatsRead.Flags.BoolVar(&raw, "raw", false, "When true, the command will display the raw value of the object.")
+ cmdStatsRead.Flags.BoolVar(&rawJson, "json", false, "When true, the command will display the raw value of the object in json format.")
cmdStatsRead.Flags.BoolVar(&showType, "type", false, "When true, the type of the values will be displayed.")
// stats watch flags
@@ -319,6 +322,7 @@
}()
var lastErr error
+ jsonOutputs := []string{}
for {
select {
case err := <-errors:
@@ -326,9 +330,16 @@
fmt.Fprintln(env.Stderr, err)
case out, ok := <-output:
if !ok {
+ if rawJson {
+ fmt.Fprintf(env.Stdout, "[%s]", strings.Join(jsonOutputs, ","))
+ }
return lastErr
}
- fmt.Fprintln(env.Stdout, out)
+ if rawJson {
+ jsonOutputs = append(jsonOutputs, out)
+ } else {
+ fmt.Fprintln(env.Stdout, out)
+ }
}
}
}
@@ -347,7 +358,12 @@
errors <- fmt.Errorf("%s: %v", name, err)
// fv is still valid, so dump it out too.
}
- output <- fmt.Sprintf("%s: %v", name, fv)
+ // Add "name" to the returned json string if "-json" flag is set.
+ if rawJson {
+ output <- strings.Replace(fv, "{", fmt.Sprintf(`{"Name":%q,`, name), 1)
+ } else {
+ output <- fmt.Sprintf("%s: %v", name, fv)
+ }
}
var cmdStatsWatch = &cmdline.Command{
@@ -441,6 +457,29 @@
if showType {
ret += value.Type().String() + ": "
}
+ if rawJson {
+ var converted interface{}
+ // We will just return raw string if any of the following steps fails.
+ if err := vdl.Convert(&converted, value); err != nil {
+ retErr := fmt.Errorf("couldn't show raw content in json format: %v", err)
+ return ret + value.String(), retErr
+ }
+ result := struct {
+ Type string
+ Value interface{}
+ }{
+ Value: converted,
+ }
+ if showType {
+ result.Type = value.Type().String()
+ }
+ resultBytes, err := json.Marshal(result)
+ if err != nil {
+ retErr := fmt.Errorf("couldn't show raw content in json format: %v", err)
+ return ret + value.String(), retErr
+ }
+ return string(resultBytes), nil
+ }
if raw {
return ret + value.String(), nil
}