// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
	"sync"
	"time"

	"golang.org/x/net/context"
)

// ackBuffer stores the pending ack IDs and notifies the Dirty channel when it becomes non-empty.
type ackBuffer struct {
	Dirty chan struct{}
	// Close done when ackBuffer is no longer needed.
	Done chan struct{}

	mu      sync.Mutex
	pending []string
	send    bool
}

// Add adds ackID to the buffer.
func (buf *ackBuffer) Add(ackID string) {
	buf.mu.Lock()
	defer buf.mu.Unlock()
	buf.pending = append(buf.pending, ackID)

	// If we are transitioning into a non-empty notification state.
	if buf.send && len(buf.pending) == 1 {
		buf.notify()
	}
}

// RemoveAll removes all ackIDs from the buffer and returns them.
func (buf *ackBuffer) RemoveAll() []string {
	buf.mu.Lock()
	defer buf.mu.Unlock()

	ret := buf.pending
	buf.pending = nil
	return ret
}

// SendNotifications enables sending dirty notification on empty -> non-empty transitions.
// If the buffer is already non-empty, a notification will be sent immediately.
func (buf *ackBuffer) SendNotifications() {
	buf.mu.Lock()
	defer buf.mu.Unlock()

	buf.send = true
	// If we are transitioning into a non-empty notification state.
	if len(buf.pending) > 0 {
		buf.notify()
	}
}

func (buf *ackBuffer) notify() {
	go func() {
		select {
		case buf.Dirty <- struct{}{}:
		case <-buf.Done:
		}
	}()
}

// acker acks messages in batches.
type acker struct {
	Client  *Client
	Ctx     context.Context  // The context to use when acknowledging messages.
	Sub     string           // The full name of the subscription.
	AckTick <-chan time.Time // AckTick supplies the frequency with which to make ack requests.

	// Notify is called with an ack ID after the message with that ack ID
	// has been processed.  An ackID is considered to have been processed
	// if at least one attempt has been made to acknowledge it.
	Notify func(string)

	ackBuffer

	wg   sync.WaitGroup
	done chan struct{}
}

// Start intiates processing of ackIDs which are added via Add.
// Notify is called with each ackID once it has been processed.
func (a *acker) Start() {
	a.done = make(chan struct{})
	a.ackBuffer.Dirty = make(chan struct{})
	a.ackBuffer.Done = a.done

	a.wg.Add(1)
	go func() {
		defer a.wg.Done()
		for {
			select {
			case <-a.ackBuffer.Dirty:
				a.ack(a.ackBuffer.RemoveAll())
			case <-a.AckTick:
				a.ack(a.ackBuffer.RemoveAll())
			case <-a.done:
				return
			}
		}

	}()
}

// Ack adds an ack id to be acked in the next batch.
func (a *acker) Ack(ackID string) {
	a.ackBuffer.Add(ackID)
}

// FastMode switches acker into a mode which acks messages as they arrive, rather than waiting
// for a.AckTick.
func (a *acker) FastMode() {
	a.ackBuffer.SendNotifications()
}

// Stop drops all pending messages, and releases resources before returning.
func (a *acker) Stop() {
	close(a.done)
	a.wg.Wait()
}

const maxAckRetries = 1

// ack acknowledges the supplied ackIDs.
// After the acknowledgement request has completed (regardless of its success
// or failure), ids will be passed to a.Notify.
func (a *acker) ack(ids []string) {
	var retries int
	head, tail := a.Client.s.splitAckIDs(ids)
	for len(head) > 0 {
		err := a.Client.s.acknowledge(a.Ctx, a.Sub, head)
		if err != nil && retries < maxAckRetries {
			// TODO(mcgreevy): more sophisticated retry on failure.
			// NOTE: it is not incorrect to drop acks if we decide not to retry; the messages
			//  will be redelievered, but this is a documented behaviour of the API.
			retries += 1
			continue
		}
		retries = 0
		head, tail = a.Client.s.splitAckIDs(tail)
	}
	for _, id := range ids {
		a.Notify(id)
	}
}
