blob: d07058eba81c9b1761d46df84b1874d9ebf78247 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package local
import (
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
"v.io/v23/context"
"v.io/v23/flow"
)
func init() {
flow.RegisterProtocol("local", &local{listeners: map[addr]*listener{}})
}
type addr string
func (a addr) Network() string { return "local" }
func (a addr) String() string { return string(a) }
type conn struct {
addr addr
incoming chan []byte
peer *conn
*wire
}
func (c *conn) LocalAddr() net.Addr { return c.addr }
func (c *conn) ReadMsg() ([]byte, error) {
select {
case msg := <-c.incoming:
return msg, nil
case <-c.wire.closech:
return nil, io.EOF
}
}
func (c *conn) WriteMsg(data ...[]byte) (int, error) {
l := 0
for _, b := range data {
l += len(b)
}
agg := make([]byte, 0, l)
for _, b := range data {
agg = append(agg, b...)
}
select {
case c.peer.incoming <- agg:
return l, nil
case <-c.wire.closech:
return 0, io.EOF
}
}
type wire struct {
mu sync.Mutex
closed bool
closech chan struct{}
}
func (w *wire) Close() error {
w.mu.Lock()
if !w.closed {
w.closed = true
close(w.closech)
}
w.mu.Unlock()
return nil
}
type listener struct {
addr addr
conns chan *conn
closech chan struct{}
local *local
closed bool // protected by local.mu
}
func (l *listener) Accept(ctx *context.T) (flow.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.closech:
return nil, fmt.Errorf("listener closed")
}
}
func (l *listener) Addr() net.Addr {
return l.addr
}
func (l *listener) Close() error {
l.local.mu.Lock()
if !l.closed {
l.closed = true
close(l.closech)
}
l.local.mu.Unlock()
return nil
}
type local struct {
mu sync.Mutex
next int
listeners map[addr]*listener
}
func (l *local) nextAddrLocked() addr {
ret := strconv.FormatInt(int64(l.next), 10)
l.next++
return addr(ret)
}
func (l *local) Dial(ctx *context.T, network, address string, timeout time.Duration) (flow.Conn, error) {
l.mu.Lock()
listener := l.listeners[addr(address)]
daddr, aaddr := l.nextAddrLocked(), l.nextAddrLocked()
l.mu.Unlock()
if listener == nil {
return nil, fmt.Errorf("unreachable")
}
w := &wire{closech: make(chan struct{})}
d := &conn{addr: daddr, incoming: make(chan []byte), wire: w}
a := &conn{addr: aaddr, incoming: make(chan []byte), wire: w}
d.peer, a.peer = a, d
t := time.NewTimer(timeout)
defer t.Stop()
select {
case <-t.C:
return nil, fmt.Errorf("timeout")
case listener.conns <- a:
return d, nil
case <-listener.closech:
return nil, fmt.Errorf("unreachable")
}
}
func (l *local) Resolve(ctx *context.T, network, address string) (string, []string, error) {
return network, []string{address}, nil
}
func (l *local) Listen(ctx *context.T, network, address string) (flow.Listener, error) {
defer l.mu.Unlock()
l.mu.Lock()
a := addr(address)
if a == "" {
a = l.nextAddrLocked()
}
if _, ok := l.listeners[a]; ok {
return nil, fmt.Errorf("address in use")
}
listener := &listener{
addr: a,
conns: make(chan *conn, 1),
closech: make(chan struct{}),
local: l,
}
l.listeners[a] = listener
return listener, nil
}