TBR

v: renaming the v directory to go

Change-Id: I4fd9f6ee2895d8034c23b65927eb118980b3c17a
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
new file mode 100644
index 0000000..6638aaa
--- /dev/null
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -0,0 +1,233 @@
+// Package manager provides an implementation of the Manager interface defined in veyron2/ipc/stream.
+package manager
+
+import (
+	"crypto/tls"
+	"errors"
+	"fmt"
+	"net"
+	"strings"
+	"sync"
+
+	"veyron/runtimes/google/ipc/stream/crypto"
+	"veyron/runtimes/google/ipc/stream/vif"
+	"veyron/runtimes/google/ipc/version"
+	inaming "veyron/runtimes/google/naming"
+
+	"veyron2/ipc/stream"
+	"veyron2/naming"
+	"veyron2/verror"
+	"veyron2/vlog"
+)
+
+var errShutDown = errors.New("manager has been shut down")
+
+// InternalNew creates a new stream.Manager for managing streams where the local
+// process is identified by the provided RoutingID.
+//
+// As the name suggests, this method is intended for use only within packages
+// placed inside veyron/runtimes/google. Code outside the
+// veyron/runtimes/google/* packages should never call this method.
+func InternalNew(rid naming.RoutingID) stream.Manager {
+	return &manager{
+		rid:          rid,
+		vifs:         vif.NewSet(),
+		sessionCache: crypto.TLSClientSessionCache{tls.NewLRUClientSessionCache(-1)},
+		listeners:    make(map[listener]bool),
+	}
+}
+
+type manager struct {
+	rid          naming.RoutingID
+	vifs         *vif.Set
+	sessionCache crypto.TLSClientSessionCache
+
+	muListeners sync.Mutex
+	listeners   map[listener]bool // GUARDED_BY(muListeners)
+	shutdown    bool              // GUARDED_BY(muListeners)
+}
+
+// FindOrDialVIF returns the network connection (VIF) to the provided address
+// from the cache in the manager. If not already present in the cache, a new
+// connection will be created using net.Dial.
+func (m *manager) FindOrDialVIF(addr net.Addr) (*vif.VIF, error) {
+	network, address := addr.Network(), addr.String()
+	if vf := m.vifs.Find(network, address); vf != nil {
+		return vf, nil
+	}
+	vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
+	conn, err := net.Dial(network, address)
+	if err != nil {
+		return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
+	}
+	// (network, address) in the endpoint might not always match up
+	// with the key used in the vifs. For example:
+	// - conn, err := net.Dial("tcp", "www.google.com:80")
+	//   fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
+	// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
+	//   might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
+	// Thus, look for VIFs with the resolved address as well.
+	if vf := m.vifs.Find(conn.RemoteAddr().Network(), conn.RemoteAddr().String()); vf != nil {
+		vlog.VI(1).Infof("(%q, %q) resolved to (%q, %q) which exists in the VIF cache. Closing newly Dialed connection", network, address, conn.RemoteAddr().Network(), conn.RemoteAddr())
+		conn.Close()
+		return vf, nil
+	}
+	vf, err := vif.InternalNewDialedVIF(conn, m.rid, nil)
+	if err != nil {
+		conn.Close()
+		return nil, fmt.Errorf("failed to create VIF: %v", err)
+	}
+	// TODO(ashankar): If two goroutines are simultaneously invoking
+	// manager.Dial, it is possible that two VIFs are inserted into m.vifs
+	// for the same remote network address. This is normally not a problem,
+	// but can be troublesome if the remote endpoint corresponds to a
+	// proxy, since the proxy requires a single network connection per
+	// routing id. Figure out a way to handle this cleanly. One option is
+	// to have only a single VIF per remote network address - have to think
+	// that through.
+	m.vifs.Insert(vf)
+	return vf, nil
+}
+
+func (m *manager) Dial(remote naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) {
+	// If vif.Dial fails because the cached network connection was broken, remove from
+	// the cache and try once more.
+	for retry := true; true; retry = false {
+		vf, err := m.FindOrDialVIF(remote.Addr())
+		if err != nil {
+			return nil, err
+		}
+		vc, err := vf.Dial(remote, append(opts, m.sessionCache)...)
+		if !retry || verror.ErrorID(err) != verror.Aborted {
+			return vc, err
+		}
+		m.vifs.Delete(vf)
+		vlog.VI(2).Infof("VIF %v is closed, removing from cache", vf)
+	}
+	return nil, verror.Internalf("should not reach here")
+}
+
+func (m *manager) Listen(protocol, address string, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
+	m.muListeners.Lock()
+	if m.shutdown {
+		m.muListeners.Unlock()
+		return nil, nil, errShutDown
+	}
+	m.muListeners.Unlock()
+
+	if protocol == inaming.Network {
+		// Act as if listening on the address of a remote proxy.
+		ep, err := inaming.NewEndpoint(address)
+		if err != nil {
+			return nil, nil, fmt.Errorf("failed to parse endpoint %q: %v", address, err)
+		}
+		return m.remoteListen(ep, opts)
+	}
+	netln, err := net.Listen(protocol, address)
+	if err != nil {
+		return nil, nil, fmt.Errorf("net.Listen(%q, %q) failed: %v", protocol, address, err)
+	}
+
+	m.muListeners.Lock()
+	if m.shutdown {
+		m.muListeners.Unlock()
+		closeNetListener(netln)
+		return nil, nil, errShutDown
+	}
+	ln := newNetListener(m, netln, opts)
+	m.listeners[ln] = true
+	m.muListeners.Unlock()
+
+	ep := version.Endpoint(netln.Addr().Network(), netln.Addr().String(), m.rid)
+	return ln, ep, nil
+}
+
+func (m *manager) remoteListen(proxy naming.Endpoint, listenerOpts []stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
+	ep, err := version.ProxiedEndpoint(m.rid, proxy)
+	if err != nil {
+		return nil, nil, err
+	}
+	ln, err := newProxyListener(m, proxy, listenerOpts)
+	if err != nil {
+		return nil, nil, err
+	}
+	m.muListeners.Lock()
+	defer m.muListeners.Unlock()
+	if m.shutdown {
+		ln.Close()
+		return nil, nil, errShutDown
+	}
+	m.listeners[ln] = true
+	return ln, ep, nil
+}
+
+func (m *manager) ShutdownEndpoint(remote naming.Endpoint) {
+	vifs := m.vifs.List()
+	total := 0
+	for _, vf := range vifs {
+		total += vf.ShutdownVCs(remote)
+	}
+	vlog.VI(1).Infof("ShutdownEndpoint(%q) closed %d VCs", remote, total)
+}
+
+func closeNetListener(ln net.Listener) {
+	addr := ln.Addr()
+	err := ln.Close()
+	vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
+}
+
+func (m *manager) removeListener(ln listener) {
+	m.muListeners.Lock()
+	delete(m.listeners, ln)
+	m.muListeners.Unlock()
+}
+
+func (m *manager) Shutdown() {
+	m.muListeners.Lock()
+	if m.shutdown {
+		m.muListeners.Unlock()
+		return
+	}
+	m.shutdown = true
+	var wg sync.WaitGroup
+	wg.Add(len(m.listeners))
+	for ln, _ := range m.listeners {
+		go func(ln stream.Listener) {
+			ln.Close()
+			wg.Done()
+		}(ln)
+	}
+	m.listeners = make(map[listener]bool)
+	m.muListeners.Unlock()
+	wg.Wait()
+
+	vifs := m.vifs.List()
+	for _, vf := range vifs {
+		vf.Close()
+	}
+}
+
+func (m *manager) DebugString() string {
+	vifs := m.vifs.List()
+
+	m.muListeners.Lock()
+	defer m.muListeners.Unlock()
+
+	l := make([]string, 0)
+	l = append(l, fmt.Sprintf("Manager: RoutingID:%v #VIFs:%d #Listeners:%d Shutdown:%t", m.rid, len(vifs), len(m.listeners), m.shutdown))
+	if len(vifs) > 0 {
+		l = append(l, "============================VIFs================================================")
+		for ix, vif := range vifs {
+			l = append(l, fmt.Sprintf("%4d) %v", ix, vif.DebugString()))
+			l = append(l, "--------------------------------------------------------------------------------")
+		}
+	}
+	if len(m.listeners) > 0 {
+		l = append(l, "=======================================Listeners==================================================")
+		l = append(l, "  (stream listeners, their local network listeners (missing for proxied listeners), and VIFS")
+		for ln, _ := range m.listeners {
+			l = append(l, ln.DebugString())
+		}
+	}
+	return strings.Join(l, "\n")
+}