| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package lib |
| |
| import ( |
| "bytes" |
| "encoding/hex" |
| "fmt" |
| "io" |
| "sync" |
| |
| "v.io/v23/vom" |
| ) |
| |
| func HexVomEncode(v interface{}, te *vom.TypeEncoder) (string, error) { |
| var buf bytes.Buffer |
| var encoder *vom.Encoder |
| if te != nil { |
| encoder = vom.NewEncoderWithTypeEncoder(&buf, te) |
| } else { |
| |
| encoder = vom.NewEncoder(&buf) |
| } |
| if err := encoder.Encode(v); err != nil { |
| return "", err |
| } |
| return hex.EncodeToString(buf.Bytes()), nil |
| } |
| |
| func HexVomEncodeOrDie(v interface{}, te *vom.TypeEncoder) string { |
| s, err := HexVomEncode(v, te) |
| if err != nil { |
| panic(err) |
| } |
| return s |
| } |
| |
| func HexVomDecode(data string, v interface{}, td *vom.TypeDecoder) error { |
| binbytes, err := hex.DecodeString(data) |
| if err != nil { |
| return fmt.Errorf("Error decoding hex string %q: %v", data, err) |
| } |
| var decoder *vom.Decoder |
| if td != nil { |
| decoder = vom.NewDecoderWithTypeDecoder(bytes.NewReader(binbytes), td) |
| } else { |
| decoder = vom.NewDecoder(bytes.NewReader(binbytes)) |
| } |
| return decoder.Decode(v) |
| } |
| |
| // TypeReader implements io.Reader but allows changing the underlying buffer. |
| // This is useful for merging discrete messages that are part of the same flow. |
| type TypeReader struct { |
| buf bytes.Buffer |
| mu sync.Mutex |
| isClosed bool |
| cond *sync.Cond |
| } |
| |
| func NewTypeReader() *TypeReader { |
| reader := &TypeReader{} |
| reader.cond = sync.NewCond(&reader.mu) |
| return reader |
| } |
| |
| func (r *TypeReader) Add(data string) error { |
| binBytes, err := hex.DecodeString(data) |
| if err != nil { |
| return err |
| } |
| r.mu.Lock() |
| _, err = r.buf.Write(binBytes) |
| r.mu.Unlock() |
| r.cond.Signal() |
| return err |
| } |
| |
| func (r *TypeReader) Read(p []byte) (int, error) { |
| r.mu.Lock() |
| defer r.mu.Unlock() |
| for { |
| if r.buf.Len() > 0 { |
| return r.buf.Read(p) |
| } |
| if r.isClosed { |
| return 0, io.EOF |
| } |
| r.cond.Wait() |
| } |
| |
| } |
| |
| func (r *TypeReader) Close() { |
| r.mu.Lock() |
| r.isClosed = true |
| r.mu.Unlock() |
| r.cond.Broadcast() |
| } |
| |
| // TypeWriter implements io.Writer but allows changing the underlying buffer. |
| // This is useful for merging discrete messages that are part of the same flow. |
| type TypeWriter struct { |
| writer ClientWriter |
| } |
| |
| func NewTypeWriter(w ClientWriter) *TypeWriter { |
| return &TypeWriter{writer: w} |
| } |
| |
| func (w *TypeWriter) Write(p []byte) (int, error) { |
| err := w.writer.Send(ResponseTypeMessage, p) |
| if err != nil { |
| return 0, err |
| } |
| return len(p), nil |
| } |