// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package manager provides an implementation of the Manager interface defined in v.io/x/ref/runtime/internal/rpc/stream.
package manager

import (
	"fmt"
	"net"
	"strings"
	"sync"
	"time"

	"v.io/v23/naming"
	"v.io/v23/rpc"
	"v.io/v23/security"
	"v.io/v23/verror"
	"v.io/x/lib/vlog"

	"v.io/x/ref/lib/apilog"
	"v.io/x/ref/lib/stats"
	"v.io/x/ref/lib/stats/counter"
	inaming "v.io/x/ref/runtime/internal/naming"
	"v.io/x/ref/runtime/internal/rpc/stream"
	"v.io/x/ref/runtime/internal/rpc/stream/vc"
	"v.io/x/ref/runtime/internal/rpc/stream/vif"
)

const pkgPath = "v.io/x/ref/runtime/internal/rpc/stream/manager"

var (
	// These errors are intended to be used as arguments to higher
	// level errors and hence {1}{2} is omitted from their format
	// strings to avoid repeating these n-times in the final error
	// message visible to the user.
	errUnknownNetwork                          = reg(".errUnknownNetwork", "unknown network{:3}")
	errEndpointParseError                      = reg(".errEndpointParseError", "failed to parse endpoint {3}{:4}")
	errAlreadyShutdown                         = reg(".errAlreadyShutdown", "already shutdown")
	errProvidedServerBlessingsWithoutPrincipal = reg(".errServerBlessingsWithoutPrincipal", "blessings provided but with no principal")
	errNoBlessingNames                         = reg(".errNoBlessingNames", "no blessing names could be extracted for the provided principal")
)

const (
	// The default time after which an VIF is closed if no VC is opened.
	defaultStartTimeout = 3 * time.Second
	// The default time after which an idle VC is closed.
	defaultIdleTimeout = 30 * time.Second
)

// InternalNew creates a new stream.Manager for managing streams where the local
// process is identified by the provided RoutingID.
//
// As the name suggests, this method is intended for use only within packages
// placed inside v.io/x/ref/runtime/internal. Code outside the
// v.io/x/ref/runtime/internal/* packages should never call this method.
func InternalNew(rid naming.RoutingID) stream.Manager {
	statsPrefix := naming.Join("rpc", "stream", "routing-id", rid.String())
	m := &manager{
		rid:         rid,
		vifs:        vif.NewSet(),
		listeners:   make(map[listener]bool),
		statsPrefix: statsPrefix,
		killedConns: stats.NewCounter(naming.Join(statsPrefix, "killed-connections")),
	}
	stats.NewStringFunc(naming.Join(m.statsPrefix, "debug"), m.DebugString)
	return m
}

type manager struct {
	rid  naming.RoutingID
	vifs *vif.Set

	muListeners sync.Mutex
	listeners   map[listener]bool // GUARDED_BY(muListeners)
	shutdown    bool              // GUARDED_BY(muListeners)

	statsPrefix string
	killedConns *counter.Counter
}

var _ stream.Manager = (*manager)(nil)

type DialTimeout time.Duration

func (DialTimeout) RPCStreamVCOpt() {}
func (DialTimeout) RPCClientOpt() {
	defer apilog.LogCall(nil)(nil) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
}

func dial(d rpc.DialerFunc, network, address string, timeout time.Duration) (net.Conn, error) {
	if d != nil {
		conn, err := d(network, address, timeout)
		if err != nil {
			return nil, verror.New(stream.ErrDialFailed, nil, err)
		}
		return conn, nil
	}
	return nil, verror.New(stream.ErrDialFailed, nil, verror.New(errUnknownNetwork, nil, network))
}

func resolve(r rpc.ResolverFunc, network, address string) (string, string, error) {
	if r != nil {
		net, addr, err := r(network, address)
		if err != nil {
			return "", "", verror.New(stream.ErrResolveFailed, nil, err)
		}
		return net, addr, nil
	}
	return "", "", verror.New(stream.ErrResolveFailed, nil, verror.New(errUnknownNetwork, nil, network))
}

// FindOrDialVIF returns the network connection (VIF) to the provided address
// from the cache in the manager. If not already present in the cache, a new
// connection will be created using net.Dial.
func (m *manager) FindOrDialVIF(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (*vif.VIF, error) {
	// Extract options.
	var timeout time.Duration
	for _, o := range opts {
		switch v := o.(type) {
		case DialTimeout:
			timeout = time.Duration(v)
		}
	}
	addr := remote.Addr()
	d, r, _, _ := rpc.RegisteredProtocol(addr.Network())
	// (network, address) in the endpoint might not always match up
	// with the key used in the vifs. For example:
	// - conn, err := net.Dial("tcp", "www.google.com:80")
	//   fmt.Println(conn.RemoteAddr()) // Might yield the corresponding IP address
	// - Similarly, an unspecified IP address (net.IP.IsUnspecified) like "[::]:80"
	//   might yield "[::1]:80" (loopback interface) in conn.RemoteAddr().
	// Thus, look for VIFs with the resolved address.
	network, address, err := resolve(r, addr.Network(), addr.String())
	if err != nil {
		return nil, err
	}
	vf, unblock := m.vifs.BlockingFind(network, address)
	if vf != nil {
		vlog.VI(1).Infof("(%q, %q) resolved to (%q, %q) which exists in the VIF cache.", addr.Network(), addr.String(), network, address)
		return vf, nil
	}
	defer unblock()

	vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
	conn, err := dial(d, network, address, timeout)
	if err != nil {
		return nil, err
	}

	opts = append([]stream.VCOpt{vc.StartTimeout{defaultStartTimeout}}, opts...)
	vf, err = vif.InternalNewDialedVIF(conn, m.rid, principal, nil, m.deleteVIF, opts...)
	if err != nil {
		conn.Close()
		return nil, err
	}
	m.vifs.Insert(vf, network, address)
	return vf, nil
}

func (m *manager) Dial(remote naming.Endpoint, principal security.Principal, opts ...stream.VCOpt) (stream.VC, error) {
	// If vif.Dial fails because the cached network connection was broken, remove from
	// the cache and try once more.
	for retry := true; true; retry = false {
		vf, err := m.FindOrDialVIF(remote, principal, opts...)
		if err != nil {
			return nil, err
		}
		opts = append([]stream.VCOpt{vc.IdleTimeout{defaultIdleTimeout}}, opts...)
		vc, err := vf.Dial(remote, principal, opts...)
		if !retry || verror.ErrorID(err) != stream.ErrAborted.ID {
			return vc, err
		}
		vf.Close()
	}
	return nil, verror.NewErrInternal(nil) // Not reached
}

func (m *manager) deleteVIF(vf *vif.VIF) {
	vlog.VI(2).Infof("%p: VIF %v is closed, removing from cache", m, vf)
	m.vifs.Delete(vf)
}

func listen(protocol, address string) (net.Listener, error) {
	if _, _, l, _ := rpc.RegisteredProtocol(protocol); l != nil {
		ln, err := l(protocol, address)
		if err != nil {
			return nil, verror.New(stream.ErrNetwork, nil, err)
		}
		return ln, nil
	}
	return nil, verror.New(stream.ErrBadArg, nil, verror.New(errUnknownNetwork, nil, protocol))
}

func (m *manager) Listen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, naming.Endpoint, error) {
	bNames, err := extractBlessingNames(principal, blessings)
	if err != nil {
		return nil, nil, err
	}
	ln, ep, err := m.internalListen(protocol, address, principal, blessings, opts...)
	if err != nil {
		return nil, nil, err
	}
	ep.Blessings = bNames
	return ln, ep, nil
}

func (m *manager) internalListen(protocol, address string, principal security.Principal, blessings security.Blessings, opts ...stream.ListenerOpt) (stream.Listener, *inaming.Endpoint, error) {
	m.muListeners.Lock()
	if m.shutdown {
		m.muListeners.Unlock()
		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
	}
	m.muListeners.Unlock()

	if protocol == inaming.Network {
		// Act as if listening on the address of a remote proxy.
		ep, err := inaming.NewEndpoint(address)
		if err != nil {
			return nil, nil, verror.New(stream.ErrBadArg, nil, verror.New(errEndpointParseError, nil, address, err))
		}
		return m.remoteListen(ep, principal, opts)
	}
	netln, err := listen(protocol, address)
	if err != nil {
		return nil, nil, err
	}

	m.muListeners.Lock()
	if m.shutdown {
		m.muListeners.Unlock()
		closeNetListener(netln)
		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
	}

	ln := newNetListener(m, netln, principal, blessings, opts)
	m.listeners[ln] = true
	m.muListeners.Unlock()
	ep := &inaming.Endpoint{
		Protocol: protocol,
		Address:  netln.Addr().String(),
		RID:      m.rid,
	}
	return ln, ep, nil
}

func (m *manager) remoteListen(proxy naming.Endpoint, principal security.Principal, listenerOpts []stream.ListenerOpt) (stream.Listener, *inaming.Endpoint, error) {
	ln, ep, err := newProxyListener(m, proxy, principal, listenerOpts)
	if err != nil {
		return nil, nil, err
	}
	m.muListeners.Lock()
	defer m.muListeners.Unlock()
	if m.shutdown {
		ln.Close()
		return nil, nil, verror.New(stream.ErrBadState, nil, verror.New(errAlreadyShutdown, nil))
	}
	m.listeners[ln] = true
	return ln, ep, nil
}

func (m *manager) ShutdownEndpoint(remote naming.Endpoint) {
	vifs := m.vifs.List()
	total := 0
	for _, vf := range vifs {
		total += vf.ShutdownVCs(remote)
	}
	vlog.VI(1).Infof("ShutdownEndpoint(%q) closed %d VCs", remote, total)
}

func closeNetListener(ln net.Listener) {
	addr := ln.Addr()
	err := ln.Close()
	vlog.VI(1).Infof("Closed net.Listener on (%q, %q): %v", addr.Network(), addr, err)
}

func (m *manager) removeListener(ln listener) {
	m.muListeners.Lock()
	delete(m.listeners, ln)
	m.muListeners.Unlock()
}

func (m *manager) Shutdown() {
	stats.Delete(m.statsPrefix)
	m.muListeners.Lock()
	if m.shutdown {
		m.muListeners.Unlock()
		return
	}
	m.shutdown = true
	var wg sync.WaitGroup
	wg.Add(len(m.listeners))
	for ln, _ := range m.listeners {
		go func(ln stream.Listener) {
			ln.Close()
			wg.Done()
		}(ln)
	}
	m.listeners = make(map[listener]bool)
	m.muListeners.Unlock()
	wg.Wait()

	vifs := m.vifs.List()
	for _, vf := range vifs {
		vf.Close()
	}
}

func (m *manager) RoutingID() naming.RoutingID {
	return m.rid
}

func (m *manager) DebugString() string {
	vifs := m.vifs.List()

	m.muListeners.Lock()
	defer m.muListeners.Unlock()

	l := make([]string, 0)
	l = append(l, fmt.Sprintf("Manager: RoutingID:%v #VIFs:%d #Listeners:%d Shutdown:%t", m.rid, len(vifs), len(m.listeners), m.shutdown))
	if len(vifs) > 0 {
		l = append(l, "============================VIFs================================================")
		for ix, vif := range vifs {
			l = append(l, fmt.Sprintf("%4d) %v", ix, vif.DebugString()))
			l = append(l, "--------------------------------------------------------------------------------")
		}
	}
	if len(m.listeners) > 0 {
		l = append(l, "=======================================Listeners==================================================")
		l = append(l, "  (stream listeners, their local network listeners (missing for proxied listeners), and VIFS")
		for ln, _ := range m.listeners {
			l = append(l, ln.DebugString())
		}
	}
	return strings.Join(l, "\n")
}

func extractBlessingNames(p security.Principal, b security.Blessings) ([]string, error) {
	if !b.IsZero() && p == nil {
		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errProvidedServerBlessingsWithoutPrincipal, nil))
	}
	if p == nil {
		return nil, nil
	}
	var ret []string
	for b, _ := range p.BlessingsInfo(b) {
		ret = append(ret, b)
	}
	if len(ret) == 0 {
		return nil, verror.New(stream.ErrBadArg, nil, verror.New(errNoBlessingNames, nil))
	}
	return ret, nil
}
