blob: 820a3a0c882c96c62fb0435759d718d2fe13d89f [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package vc
import (
"v.io/v23/verror"
"v.io/x/ref/internal/logger"
"v.io/x/ref/runtime/internal/lib/upcqueue"
"v.io/x/ref/runtime/internal/rpc/stream"
)
var (
// These errors are intended to be used as arguments to higher
// level errors and hence {1}{2} is omitted from their format
// strings to avoid repeating these n-times in the final error
// message visible to the user.
errListenerClosed = reg(".errListenerClosed", "listener has been closed")
errGetFromQueue = reg(".errGetFromQueue", "upcqueue.Get failed{:3}")
)
type listener struct {
q *upcqueue.T
}
var _ stream.Listener = (*listener)(nil)
func newListener() *listener { return &listener{q: upcqueue.New()} }
func (l *listener) Enqueue(f stream.Flow) error {
err := l.q.Put(f)
if err == upcqueue.ErrQueueIsClosed {
logger.Global().VI(3).Infof("Listener closed: %p, %p", l, l.q)
return verror.New(stream.ErrBadState, nil, verror.New(errListenerClosed, nil))
}
return err
}
func (l *listener) Accept() (stream.Flow, error) {
item, err := l.q.Get(nil)
if err == upcqueue.ErrQueueIsClosed {
return nil, verror.New(stream.ErrBadState, nil, verror.New(errListenerClosed, nil))
}
if err != nil {
return nil, verror.New(stream.ErrNetwork, nil, verror.New(errGetFromQueue, nil, err))
}
return item.(stream.Flow), nil
}
func (l *listener) Close() error {
logger.Global().VI(3).Infof("Listener being closed: %p, %p", l, l.q)
l.q.Close()
return nil
}