blob: 127d4511d9545504592e2397a8f8b2d75820f7ad [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"io"
"sync"
"sync/atomic"
"v.io/v23/verror"
"v.io/x/ref/runtime/internal/lib/iobuf"
vsync "v.io/x/ref/runtime/internal/lib/sync"
"v.io/x/ref/runtime/internal/lib/upcqueue"
"v.io/x/ref/runtime/internal/rpc/stream"
)
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
// strings to avoid repeating these n-times in the final error
// message visible to the user.
errGetFailed = reg(".errGetFailed", "upcqueue.Get failed:{:3}")
)
// readHandler is the interface used by the reader to notify other components
// of the number of bytes returned in Read calls.
type readHandler interface {
HandleRead(bytes uint)
}
// reader implements the io.Reader and SetReadDeadline interfaces for a Flow,
// backed by iobuf.Slice objects read from a upcqueue.
type reader struct {
handler readHandler
src *upcqueue.T
mu sync.Mutex
buf *iobuf.Slice // GUARDED_BY(mu)
deadline <-chan struct{} // GUARDED_BY(mu)
totalBytes uint32
}
func newReader(h readHandler) *reader {
return &reader{handler: h, src: upcqueue.New()}
}
func (r *reader) Close() {
r.src.Close()
}
func (r *reader) Read(b []byte) (int, error) {
// net.Conn requires that all methods be invokable by multiple
// goroutines simultaneously. Read calls are serialized to ensure
// contiguous chunks of data are provided from each Read call.
r.mu.Lock()
n, err := r.readLocked(b)
r.mu.Unlock()
atomic.AddUint32(&r.totalBytes, uint32(n))
if n > 0 {
r.handler.HandleRead(uint(n))
}
return n, err
}
func (r *reader) readLocked(b []byte) (int, error) {
if r.buf == nil {
slice, err := r.src.Get(r.deadline)
if err != nil {
switch err {
case upcqueue.ErrQueueIsClosed:
return 0, io.EOF
case vsync.ErrCanceled:
// As per net.Conn.Read specification
return 0, stream.NewNetError(verror.New(stream.ErrNetwork, nil, verror.New(errCanceled, nil)), true, false)
default:
return 0, verror.New(stream.ErrNetwork, nil, verror.New(errGetFailed, nil, err))
}
}
r.buf = slice.(*iobuf.Slice)
}
copied := 0
for r.buf.Size() <= len(b) {
n := copy(b, r.buf.Contents)
copied += n
b = b[n:]
r.buf.Release()
r.buf = nil
slice, err := r.src.TryGet()
if err != nil {
return copied, nil
}
r.buf = slice.(*iobuf.Slice)
}
n := copy(b, r.buf.Contents)
r.buf.TruncateFront(uint(n))
copied += n
return copied, nil
}
func (r *reader) SetDeadline(deadline <-chan struct{}) {
r.mu.Lock()
r.deadline = deadline
r.mu.Unlock()
}
func (r *reader) BytesRead() uint32 {
return atomic.LoadUint32(&r.totalBytes)
}
func (r *reader) Put(slice *iobuf.Slice) error {
return r.src.Put(slice)
}