blob: f3f3788e751409a2cb264afab256ac38217548f5 [file] [log] [blame]
package vc
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"veyron.io/veyron/veyron/runtimes/google/lib/bqueue"
"veyron.io/veyron/veyron/runtimes/google/lib/iobuf"
vsync "veyron.io/veyron/veyron/runtimes/google/lib/sync"
)
var errWriterClosed = errors.New("attempt to call Write on Flow that has been Closed")
// writer implements the io.Writer and SetWriteDeadline interfaces for Flow.
type writer struct {
MTU int // Maximum size (in bytes) of each slice Put into Sink.
Sink bqueue.Writer // Buffer queue writer where data from Write is sent as iobuf.Slice objects.
Alloc *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
mu sync.Mutex // Guards call to Writes
wroteOnce bool // GUARDED_BY(mu)
isClosed bool // GUARDED_BY(mu)
closed chan struct{} // GUARDED_BY(mu)
deadline <-chan struct{} // GUARDED_BY(mu)
// Total number of bytes filled in by all Write calls on this writer.
// Atomic operations are used to manipulate it.
totalBytes uint32
// Accounting for counters borrowed from the shared pool.
muSharedCountersBorrowed sync.Mutex
sharedCountersBorrowed int // GUARDED_BY(muSharedCountersBorrowed)
}
func newWriter(mtu int, sink bqueue.Writer, alloc *iobuf.Allocator, counters *vsync.Semaphore) *writer {
return &writer{
MTU: mtu,
Sink: sink,
Alloc: alloc,
SharedCounters: counters,
closed: make(chan struct{}),
}
}
// Shutdown closes the writer and discards any queued up write buffers, i.e.,
// the bqueue.Get call will not see the buffers queued up at this writer.
// If removeWriter is true the writer will also be removed entirely from the
// bqueue, otherwise the now empty writer will eventually be returned by
// bqueue.Get.
func (w *writer) Shutdown(removeWriter bool) {
w.Sink.Shutdown(removeWriter)
w.finishClose()
}
// Close closes the writer without discarding any queued up write buffers.
func (w *writer) Close() {
w.Sink.Close()
w.finishClose()
}
func (w *writer) IsClosed() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.isClosed
}
func (w *writer) Closed() <-chan struct{} {
return w.closed
}
func (w *writer) finishClose() {
// IsClosed() and Closed() indicate that the writer is closed before
// finishClose() completes. This is safe because Alloc and shared counters
// are guarded, and are not accessed elsewhere after w.closed is closed.
w.mu.Lock()
// finishClose() is idempotent, but Go's builtin close is not.
if !w.isClosed {
w.isClosed = true
close(w.closed)
}
w.Alloc.Release()
w.mu.Unlock()
w.muSharedCountersBorrowed.Lock()
w.SharedCounters.IncN(uint(w.sharedCountersBorrowed))
w.sharedCountersBorrowed = 0
w.muSharedCountersBorrowed.Unlock()
}
// Write implements the Write call for a Flow.
//
// Flow control is achieved using receive buffers (aka counters), wherein the
// receiving end sends out the number of bytes that it is willing to read. To
// avoid an additional round-trip for the creation of new flows, the very first
// write of a new flow borrows counters from a shared pool.
func (w *writer) Write(b []byte) (int, error) {
written := 0
// net.Conn requires that multiple goroutines be able to invoke methods
// simulatenously.
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
return 0, errWriterClosed
}
for len(b) > 0 {
n := len(b)
if n > w.MTU {
n = w.MTU
}
if !w.wroteOnce && w.SharedCounters != nil {
w.wroteOnce = true
if n > MaxSharedBytes {
n = MaxSharedBytes
}
if err := w.SharedCounters.DecN(uint(n), w.deadline); err != nil {
if err == vsync.ErrCanceled {
return 0, timeoutError{}
}
return 0, fmt.Errorf("failed to get quota from receive buffers shared by all new flows on a VC: %v", err)
}
w.muSharedCountersBorrowed.Lock()
w.sharedCountersBorrowed = n
w.muSharedCountersBorrowed.Unlock()
w.Sink.Release(n)
}
slice := w.Alloc.Copy(b[:n])
if err := w.Sink.Put(slice, w.deadline); err != nil {
slice.Release()
atomic.AddUint32(&w.totalBytes, uint32(written))
switch err {
case bqueue.ErrCancelled, vsync.ErrCanceled:
return written, timeoutError{}
case bqueue.ErrWriterIsClosed:
return written, errWriterClosed
default:
return written, fmt.Errorf("bqueue.Writer.Put failed: %v", err)
}
}
written += n
b = b[n:]
}
atomic.AddUint32(&w.totalBytes, uint32(written))
return written, nil
}
func (w *writer) SetDeadline(deadline <-chan struct{}) {
w.mu.Lock()
defer w.mu.Unlock()
w.deadline = deadline
}
// Release allows the next 'bytes' of data to be removed from the buffer queue
// writer and passed to bqueue.Get.
func (w *writer) Release(bytes int) {
w.muSharedCountersBorrowed.Lock()
switch {
case w.sharedCountersBorrowed == 0:
w.Sink.Release(bytes)
case w.sharedCountersBorrowed >= bytes:
w.SharedCounters.IncN(uint(bytes))
w.sharedCountersBorrowed -= bytes
default:
w.SharedCounters.IncN(uint(w.sharedCountersBorrowed))
w.Sink.Release(bytes - w.sharedCountersBorrowed)
w.sharedCountersBorrowed = 0
}
w.muSharedCountersBorrowed.Unlock()
}
func (w *writer) BytesWritten() uint32 {
return atomic.LoadUint32(&w.totalBytes)
}