// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package rpc

import (
	"sync"

	"v.io/v23/context"
	"v.io/v23/flow"
	"v.io/v23/vom"
)

type typeCacheEntry struct {
	enc    *vom.TypeEncoder
	dec    *vom.TypeDecoder
	cancel context.CancelFunc
	writer bool
	ready  chan struct{}
}

const typeCacheCollectInterval = 1000

type typeCache struct {
	mu     sync.Mutex
	flows  map[flow.ManagedConn]*typeCacheEntry
	writes int
}

func newTypeCache() *typeCache {
	return &typeCache{flows: make(map[flow.ManagedConn]*typeCacheEntry)}
}

func (tc *typeCache) writer(c flow.ManagedConn) (write func(flow.Flow, context.CancelFunc)) {
	tc.mu.Lock()
	tce := tc.flows[c]
	if tce == nil {
		tce = &typeCacheEntry{ready: make(chan struct{})}
		tc.flows[c] = tce
	}
	if !tce.writer {
		// TODO(mattr): This is a very course garbage collection policy.
		// Every 1000 connections we clean out expired entries.
		if tc.writes++; tc.writes%typeCacheCollectInterval == 0 {
			go tc.collect()
		}
		tce.writer = true
		write = func(f flow.Flow, c context.CancelFunc) {
			if f != nil {
				tce.cancel = c
				tce.enc = vom.NewTypeEncoder(f)
				tce.dec = vom.NewTypeDecoder(f)
				tce.dec.Start() // Stopped in collect()
			}
			close(tce.ready)
		}
	}
	tc.mu.Unlock()
	return
}

func (tc *typeCache) get(ctx *context.T, c flow.ManagedConn) (*vom.TypeEncoder, *vom.TypeDecoder, error) {
	tc.mu.Lock()
	tce := tc.flows[c]
	if tce == nil {
		tce = &typeCacheEntry{ready: make(chan struct{})}
		tc.flows[c] = tce
	}
	tc.mu.Unlock()
	select {
	case <-c.Closed():
		return nil, nil, newErrTypeFlowFailure(ctx, nil)
	case <-ctx.Done():
		return nil, nil, ctx.Err()
	case <-tce.ready:
		if tce.enc == nil || tce.dec == nil {
			return nil, nil, newErrTypeFlowFailure(ctx, nil)
		}
	}
	return tce.enc, tce.dec, nil
}

func (tc *typeCache) collect() {
	tc.mu.Lock()
	conns := make([]flow.ManagedConn, len(tc.flows))
	i := 0
	for c, _ := range tc.flows {
		conns[i] = c
		i++
	}
	tc.mu.Unlock()

	last := 0
	for _, c := range conns {
		select {
		case <-c.Closed():
			conns[last] = c
			last++
		default:
		}
	}

	if last > 0 {
		tc.mu.Lock()
		for idx := 0; idx < last; idx++ {
			conn := conns[idx]
			tce := tc.flows[conn]
			if tce != nil {
				if tce.cancel != nil {
					tce.cancel()
				}
				tce.dec.Stop()
				delete(tc.flows, conn)
			}
		}
		tc.mu.Unlock()
	}
}
