// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package logging contains a Google Cloud Logging client.
//
// This package is experimental and subject to API changes.
package logging // import "google.golang.org/cloud/logging"

import (
	"errors"
	"io"
	"log"
	"sync"
	"time"

	"golang.org/x/net/context"
	api "google.golang.org/api/logging/v1beta3"
	"google.golang.org/cloud"
	"google.golang.org/cloud/internal/transport"
)

// Scope is the OAuth2 scope necessary to use Google Cloud Logging.
const Scope = api.LoggingWriteScope

// Level is the log level.
type Level int

const (
	// Default means no assigned severity level.
	Default Level = iota
	Debug
	Info
	Warning
	Error
	Critical
	Alert
	Emergency
	nLevel
)

var levelName = [nLevel]string{
	Default:   "",
	Debug:     "DEBUG",
	Info:      "INFO",
	Warning:   "WARNING",
	Error:     "ERROR",
	Critical:  "CRITICAL",
	Alert:     "ALERT",
	Emergency: "EMERGENCY",
}

func (v Level) String() string {
	return levelName[v]
}

// Client is a Google Cloud Logging client.
// It must be constructed via NewClient.
type Client struct {
	svc     *api.Service
	logs    *api.ProjectsLogsEntriesService
	projID  string
	logName string
	writer  [nLevel]io.Writer
	logger  [nLevel]*log.Logger

	mu          sync.Mutex
	queued      []*api.LogEntry
	curFlush    *flushCall  // currently in-flight flush
	flushTimer  *time.Timer // nil before first use
	timerActive bool        // whether flushTimer is armed
	inFlight    int         // number of log entries sent to API service but not yet ACKed

	// For testing:
	timeNow func() time.Time // optional

	// ServiceName may be "appengine.googleapis.com",
	// "compute.googleapis.com" or "custom.googleapis.com".
	//
	// The default is "custom.googleapis.com".
	//
	// The service name is only used by the API server to
	// determine which of the labels are used to index the logs.
	ServiceName string

	// CommonLabels are metadata labels that apply to all log
	// entries in this request, so that you don't have to repeat
	// them in each log entry's metadata.labels field. If any of
	// the log entries contains a (key, value) with the same key
	// that is in CommonLabels, then the entry's (key, value)
	// overrides the one in CommonLabels.
	CommonLabels map[string]string

	// BufferLimit is the maximum number of items to keep in memory
	// before flushing. Zero means automatic. A value of 1 means to
	// flush after each log entry.
	// The default is currently 10,000.
	BufferLimit int

	// FlushAfter optionally specifies a threshold count at which buffered
	// log entries are flushed, even if the BufferInterval has not yet
	// been reached.
	// The default is currently 10.
	FlushAfter int

	// BufferInterval is the maximum amount of time that an item
	// should remain buffered in memory before being flushed to
	// the logging service.
	// The default is currently 1 second.
	BufferInterval time.Duration

	// Overflow is a function which runs when the Log function
	// overflows its configured buffer limit. If nil, the log
	// entry is dropped. The return value from Overflow is
	// returned by Log.
	Overflow func(*Client, Entry) error
}

func (c *Client) flushAfter() int {
	if v := c.FlushAfter; v > 0 {
		return v
	}
	return 10
}

func (c *Client) bufferInterval() time.Duration {
	if v := c.BufferInterval; v > 0 {
		return v
	}
	return time.Second
}

func (c *Client) bufferLimit() int {
	if v := c.BufferLimit; v > 0 {
		return v
	}
	return 10000
}

func (c *Client) serviceName() string {
	if v := c.ServiceName; v != "" {
		return v
	}
	return "custom.googleapis.com"
}

func (c *Client) now() time.Time {
	if now := c.timeNow; now != nil {
		return now()
	}
	return time.Now()
}

// Writer returns an io.Writer for the provided log level.
//
// Each Write call on the returned Writer generates a log entry.
//
// This Writer accessor does not allocate, so callers do not need to
// cache.
func (c *Client) Writer(v Level) io.Writer { return c.writer[v] }

// Logger returns a *log.Logger for the provided log level.
//
// A Logger for each Level is pre-allocated by NewClient with an empty
// prefix and no flags.  This Logger accessor does not allocate.
// Callers wishing to use alternate flags (such as log.Lshortfile) may
// mutate the returned Logger with SetFlags. Such mutations affect all
// callers in the program.
func (c *Client) Logger(v Level) *log.Logger { return c.logger[v] }

type levelWriter struct {
	level Level
	c     *Client
}

func (w levelWriter) Write(p []byte) (n int, err error) {
	return len(p), w.c.Log(Entry{
		Level:   w.level,
		Payload: string(p),
	})
}

// Entry is a log entry.
type Entry struct {
	// Time is the time of the entry. If the zero value, the current time is used.
	Time time.Time

	// Level is log entry's severity level.
	// The zero value means no assigned severity level.
	Level Level

	// Payload must be either a string, []byte, or something that
	// marshals via the encoding/json package to a JSON object
	// (and not any other type of JSON value).
	Payload interface{}

	// Labels optionally specifies key/value labels for the log entry.
	// Depending on the Client's ServiceName, these are indexed differently
	// by the Cloud Logging Service.
	// See https://cloud.google.com/logging/docs/logs_index
	// The Client.Log method takes ownership of this map.
	Labels map[string]string

	// TODO: de-duping id
}

func (c *Client) apiEntry(e Entry) (*api.LogEntry, error) {
	t := e.Time
	if t.IsZero() {
		t = c.now()
	}

	ent := &api.LogEntry{
		Metadata: &api.LogEntryMetadata{
			Timestamp:   t.UTC().Format(time.RFC3339Nano),
			ServiceName: c.serviceName(),
			Severity:    e.Level.String(),
			Labels:      e.Labels,
		},
	}
	switch p := e.Payload.(type) {
	case string:
		ent.TextPayload = p
	case []byte:
		ent.TextPayload = string(p)
	default:
		ent.StructPayload = api.LogEntryStructPayload(p)
	}
	return ent, nil
}

// LogSync logs e synchronously without any buffering.
// This is mostly intended for debugging or critical errors.
func (c *Client) LogSync(e Entry) error {
	ent, err := c.apiEntry(e)
	if err != nil {
		return err
	}
	_, err = c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
		CommonLabels: c.CommonLabels,
		Entries:      []*api.LogEntry{ent},
	}).Do()
	return err
}

var ErrOverflow = errors.New("logging: log entry overflowed buffer limits")

// Log queues an entry to be sent to the logging service, subject to the
// Client's parameters. By default, the log will be flushed within
// one second.
// Log only returns an error if the entry is invalid or the queue is at
// capacity. If the queue is at capacity and the entry can't be added,
// Log returns either ErrOverflow when c.Overflow is nil, or the
// value returned by c.Overflow.
func (c *Client) Log(e Entry) error {
	ent, err := c.apiEntry(e)
	if err != nil {
		return err
	}

	c.mu.Lock()
	buffered := len(c.queued) + c.inFlight

	if buffered >= c.bufferLimit() {
		c.mu.Unlock()
		if fn := c.Overflow; fn != nil {
			return fn(c, e)
		}
		return ErrOverflow
	}
	defer c.mu.Unlock()

	c.queued = append(c.queued, ent)
	if len(c.queued) >= c.flushAfter() {
		c.scheduleFlushLocked(0)
		return nil
	}
	c.scheduleFlushLocked(c.bufferInterval())
	return nil
}

// c.mu must be held.
//
// d will be one of two values: either c.BufferInterval (or its
// default value) or 0.
func (c *Client) scheduleFlushLocked(d time.Duration) {
	if c.inFlight > 0 {
		// For now to keep things simple, only allow one HTTP
		// request in flight at a time.
		return
	}
	switch {
	case c.flushTimer == nil:
		// First flush.
		c.timerActive = true
		c.flushTimer = time.AfterFunc(d, c.timeoutFlush)
	case c.timerActive && d == 0:
		// Make it happen sooner.  For example, this is the
		// case of transitioning from a 1 second flush after
		// the 1st item to an immediate flush after the 10th
		// item.
		c.flushTimer.Reset(0)
	case !c.timerActive:
		c.timerActive = true
		c.flushTimer.Reset(d)
	default:
		// else timer was already active, also at d > 0,
		// so we don't touch it and let it fire as previously
		// scheduled.
	}
}

// timeoutFlush runs in its own goroutine (from time.AfterFunc) and
// flushes c.queued.
func (c *Client) timeoutFlush() {
	c.mu.Lock()
	c.timerActive = false
	c.mu.Unlock()
	if err := c.Flush(); err != nil {
		// schedule another try
		// TODO: smarter back-off?
		c.mu.Lock()
		c.scheduleFlushLocked(5 * time.Second)
		c.mu.Unlock()
	}
}

// Ping reports whether the client's connection to Google Cloud
// Logging and the authentication configuration are valid.
func (c *Client) Ping() error {
	_, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
		Entries: []*api.LogEntry{},
	}).Do()
	return err
}

// Flush flushes any buffered log entries.
func (c *Client) Flush() error {
	var numFlush int
	c.mu.Lock()
	for {
		// We're already flushing (or we just started flushing
		// ourselves), so wait for it to finish.
		if f := c.curFlush; f != nil {
			wasEmpty := len(c.queued) == 0
			c.mu.Unlock()
			<-f.donec // wait for it
			numFlush++
			// Terminate whenever there's an error, we've
			// already flushed twice (one that was already
			// in-flight when flush was called, and then
			// one we instigated), or the queue was empty
			// when we released the locked (meaning this
			// in-flight flush removes everything present
			// when Flush was called, and we don't need to
			// kick off a new flush for things arriving
			// afterward)
			if f.err != nil || numFlush == 2 || wasEmpty {
				return f.err
			}
			// Otherwise, re-obtain the lock and loop,
			// starting over with seeing if a flush is in
			// progress, which might've been started by a
			// different goroutine before aquiring this
			// lock again.
			c.mu.Lock()
			continue
		}

		// Terminal case:
		if len(c.queued) == 0 {
			c.mu.Unlock()
			return nil
		}

		c.startFlushLocked()
	}
}

// requires c.mu be held.
func (c *Client) startFlushLocked() {
	if c.curFlush != nil {
		panic("internal error: flush already in flight")
	}
	if len(c.queued) == 0 {
		panic("internal error: no items queued")
	}
	logEntries := c.queued
	c.inFlight = len(logEntries)
	c.queued = nil

	flush := &flushCall{
		donec: make(chan struct{}),
	}
	c.curFlush = flush
	go func() {
		defer close(flush.donec)
		_, err := c.logs.Write(c.projID, c.logName, &api.WriteLogEntriesRequest{
			CommonLabels: c.CommonLabels,
			Entries:      logEntries,
		}).Do()
		flush.err = err
		c.mu.Lock()
		defer c.mu.Unlock()
		c.inFlight = 0
		c.curFlush = nil
		if err != nil {
			c.queued = append(c.queued, logEntries...)
		} else if len(c.queued) > 0 {
			c.scheduleFlushLocked(c.bufferInterval())
		}
	}()

}

const prodAddr = "https://logging.googleapis.com/"

const userAgent = "gcloud-golang-logging/20150922"

// NewClient returns a new log client, logging to the named log in the
// provided project.
//
// The exported fields on the returned client may be modified before
// the client is used for logging. Once log entries are in flight,
// the fields must not be modified.
func NewClient(ctx context.Context, projectID, logName string, opts ...cloud.ClientOption) (*Client, error) {
	httpClient, endpoint, err := transport.NewHTTPClient(ctx, append([]cloud.ClientOption{
		cloud.WithEndpoint(prodAddr),
		cloud.WithScopes(Scope),
		cloud.WithUserAgent(userAgent),
	}, opts...)...)
	if err != nil {
		return nil, err
	}
	svc, err := api.New(httpClient)
	if err != nil {
		return nil, err
	}
	svc.BasePath = endpoint
	c := &Client{
		svc:     svc,
		logs:    api.NewProjectsLogsEntriesService(svc),
		logName: logName,
		projID:  projectID,
	}
	for i := range c.writer {
		level := Level(i)
		c.writer[level] = levelWriter{level, c}
		c.logger[level] = log.New(c.writer[level], "", 0)
	}
	return c, nil
}

// flushCall is an in-flight or completed flush.
type flushCall struct {
	donec chan struct{} // closed when response is in
	err   error         // error is valid after wg is Done
}
