| // Copyright 2011 Google Inc. All rights reserved. |
| // Use of this source code is governed by the Apache 2.0 |
| // license that can be found in the LICENSE file. |
| |
| /* |
| Package taskqueue provides a client for App Engine's taskqueue service. |
| Using this service, applications may perform work outside a user's request. |
| |
| A Task may be constructed manually; alternatively, since the most common |
| taskqueue operation is to add a single POST task, NewPOSTTask makes it easy. |
| |
| t := taskqueue.NewPOSTTask("/worker", url.Values{ |
| "key": {key}, |
| }) |
| taskqueue.Add(c, t, "") // add t to the default queue |
| */ |
| package taskqueue // import "google.golang.org/appengine/taskqueue" |
| |
| import ( |
| "errors" |
| "fmt" |
| "net/http" |
| "net/url" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "golang.org/x/net/context" |
| |
| "google.golang.org/appengine" |
| "google.golang.org/appengine/internal" |
| dspb "google.golang.org/appengine/internal/datastore" |
| pb "google.golang.org/appengine/internal/taskqueue" |
| ) |
| |
| var ( |
| // ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name. |
| ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added") |
| ) |
| |
| // RetryOptions let you control whether to retry a task and the backoff intervals between tries. |
| type RetryOptions struct { |
| // Number of tries/leases after which the task fails permanently and is deleted. |
| // If AgeLimit is also set, both limits must be exceeded for the task to fail permanently. |
| RetryLimit int32 |
| |
| // Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks). |
| // If RetryLimit is also set, both limits must be exceeded for the task to fail permanently. |
| AgeLimit time.Duration |
| |
| // Minimum time between successive tries (only for push tasks). |
| MinBackoff time.Duration |
| |
| // Maximum time between successive tries (only for push tasks). |
| MaxBackoff time.Duration |
| |
| // Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks). |
| MaxDoublings int32 |
| |
| // If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value. |
| // Otherwise a zero MaxDoublings is ignored and the default is used. |
| ApplyZeroMaxDoublings bool |
| } |
| |
| // toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters. |
| func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters { |
| params := &pb.TaskQueueRetryParameters{} |
| if opt.RetryLimit > 0 { |
| params.RetryLimit = proto.Int32(opt.RetryLimit) |
| } |
| if opt.AgeLimit > 0 { |
| params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds())) |
| } |
| if opt.MinBackoff > 0 { |
| params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds()) |
| } |
| if opt.MaxBackoff > 0 { |
| params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds()) |
| } |
| if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) { |
| params.MaxDoublings = proto.Int32(opt.MaxDoublings) |
| } |
| return params |
| } |
| |
| // A Task represents a task to be executed. |
| type Task struct { |
| // Path is the worker URL for the task. |
| // If unset, it will default to /_ah/queue/<queue_name>. |
| Path string |
| |
| // Payload is the data for the task. |
| // This will be delivered as the HTTP request body. |
| // It is only used when Method is POST, PUT or PULL. |
| // url.Values' Encode method may be used to generate this for POST requests. |
| Payload []byte |
| |
| // Additional HTTP headers to pass at the task's execution time. |
| // To schedule the task to be run with an alternate app version |
| // or backend, set the "Host" header. |
| Header http.Header |
| |
| // Method is the HTTP method for the task ("GET", "POST", etc.), |
| // or "PULL" if this is task is destined for a pull-based queue. |
| // If empty, this defaults to "POST". |
| Method string |
| |
| // A name for the task. |
| // If empty, a name will be chosen. |
| Name string |
| |
| // Delay specifies the duration the task queue service must wait |
| // before executing the task. |
| // Either Delay or ETA may be set, but not both. |
| Delay time.Duration |
| |
| // ETA specifies the earliest time a task may be executed (push queues) |
| // or leased (pull queues). |
| // Either Delay or ETA may be set, but not both. |
| ETA time.Time |
| |
| // The number of times the task has been dispatched or leased. |
| RetryCount int32 |
| |
| // Tag for the task. Only used when Method is PULL. |
| Tag string |
| |
| // Retry options for this task. May be nil. |
| RetryOptions *RetryOptions |
| } |
| |
| func (t *Task) method() string { |
| if t.Method == "" { |
| return "POST" |
| } |
| return t.Method |
| } |
| |
| // NewPOSTTask creates a Task that will POST to a path with the given form data. |
| func NewPOSTTask(path string, params url.Values) *Task { |
| h := make(http.Header) |
| h.Set("Content-Type", "application/x-www-form-urlencoded") |
| return &Task{ |
| Path: path, |
| Payload: []byte(params.Encode()), |
| Header: h, |
| Method: "POST", |
| } |
| } |
| |
| var ( |
| currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") |
| defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace") |
| ) |
| |
| func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) { |
| if queueName == "" { |
| queueName = "default" |
| } |
| eta := task.ETA |
| if eta.IsZero() { |
| eta = time.Now().Add(task.Delay) |
| } else if task.Delay != 0 { |
| panic("taskqueue: both Delay and ETA are set") |
| } |
| req := &pb.TaskQueueAddRequest{ |
| QueueName: []byte(queueName), |
| TaskName: []byte(task.Name), |
| EtaUsec: proto.Int64(eta.UnixNano() / 1e3), |
| } |
| method := task.method() |
| if method == "PULL" { |
| // Pull-based task |
| req.Body = task.Payload |
| req.Mode = pb.TaskQueueMode_PULL.Enum() |
| if task.Tag != "" { |
| req.Tag = []byte(task.Tag) |
| } |
| } else { |
| // HTTP-based task |
| if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok { |
| req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum() |
| } else { |
| return nil, fmt.Errorf("taskqueue: bad method %q", method) |
| } |
| req.Url = []byte(task.Path) |
| for k, vs := range task.Header { |
| for _, v := range vs { |
| req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ |
| Key: []byte(k), |
| Value: []byte(v), |
| }) |
| } |
| } |
| if method == "POST" || method == "PUT" { |
| req.Body = task.Payload |
| } |
| |
| // Namespace headers. |
| if _, ok := task.Header[currentNamespace]; !ok { |
| // Fetch the current namespace of this request. |
| ns := internal.NamespaceFromContext(c) |
| req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ |
| Key: []byte(currentNamespace), |
| Value: []byte(ns), |
| }) |
| } |
| if _, ok := task.Header[defaultNamespace]; !ok { |
| // Fetch the X-AppEngine-Default-Namespace header of this request. |
| if ns := getDefaultNamespace(c); ns != "" { |
| req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{ |
| Key: []byte(defaultNamespace), |
| Value: []byte(ns), |
| }) |
| } |
| } |
| } |
| |
| if task.RetryOptions != nil { |
| req.RetryParameters = task.RetryOptions.toRetryParameters() |
| } |
| |
| return req, nil |
| } |
| |
| var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{ |
| pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true, |
| pb.TaskQueueServiceError_TOMBSTONED_TASK: true, |
| } |
| |
| // Add adds the task to a named queue. |
| // An empty queue name means that the default queue will be used. |
| // Add returns an equivalent Task with defaults filled in, including setting |
| // the task's Name field to the chosen name if the original was empty. |
| func Add(c context.Context, task *Task, queueName string) (*Task, error) { |
| req, err := newAddReq(c, task, queueName) |
| if err != nil { |
| return nil, err |
| } |
| res := &pb.TaskQueueAddResponse{} |
| if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil { |
| apiErr, ok := err.(*internal.APIError) |
| if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] { |
| return nil, ErrTaskAlreadyAdded |
| } |
| return nil, err |
| } |
| resultTask := *task |
| resultTask.Method = task.method() |
| if task.Name == "" { |
| resultTask.Name = string(res.ChosenTaskName) |
| } |
| return &resultTask, nil |
| } |
| |
| // AddMulti adds multiple tasks to a named queue. |
| // An empty queue name means that the default queue will be used. |
| // AddMulti returns a slice of equivalent tasks with defaults filled in, including setting |
| // each task's Name field to the chosen name if the original was empty. |
| // If a given task is badly formed or could not be added, an appengine.MultiError is returned. |
| func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) { |
| req := &pb.TaskQueueBulkAddRequest{ |
| AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)), |
| } |
| me, any := make(appengine.MultiError, len(tasks)), false |
| for i, t := range tasks { |
| req.AddRequest[i], me[i] = newAddReq(c, t, queueName) |
| any = any || me[i] != nil |
| } |
| if any { |
| return nil, me |
| } |
| res := &pb.TaskQueueBulkAddResponse{} |
| if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil { |
| return nil, err |
| } |
| if len(res.Taskresult) != len(tasks) { |
| return nil, errors.New("taskqueue: server error") |
| } |
| tasksOut := make([]*Task, len(tasks)) |
| for i, tr := range res.Taskresult { |
| tasksOut[i] = new(Task) |
| *tasksOut[i] = *tasks[i] |
| tasksOut[i].Method = tasksOut[i].method() |
| if tasksOut[i].Name == "" { |
| tasksOut[i].Name = string(tr.ChosenTaskName) |
| } |
| if *tr.Result != pb.TaskQueueServiceError_OK { |
| if alreadyAddedErrors[*tr.Result] { |
| me[i] = ErrTaskAlreadyAdded |
| } else { |
| me[i] = &internal.APIError{ |
| Service: "taskqueue", |
| Code: int32(*tr.Result), |
| } |
| } |
| any = true |
| } |
| } |
| if any { |
| return tasksOut, me |
| } |
| return tasksOut, nil |
| } |
| |
| // Delete deletes a task from a named queue. |
| func Delete(c context.Context, task *Task, queueName string) error { |
| err := DeleteMulti(c, []*Task{task}, queueName) |
| if me, ok := err.(appengine.MultiError); ok { |
| return me[0] |
| } |
| return err |
| } |
| |
| // DeleteMulti deletes multiple tasks from a named queue. |
| // If a given task could not be deleted, an appengine.MultiError is returned. |
| func DeleteMulti(c context.Context, tasks []*Task, queueName string) error { |
| taskNames := make([][]byte, len(tasks)) |
| for i, t := range tasks { |
| taskNames[i] = []byte(t.Name) |
| } |
| if queueName == "" { |
| queueName = "default" |
| } |
| req := &pb.TaskQueueDeleteRequest{ |
| QueueName: []byte(queueName), |
| TaskName: taskNames, |
| } |
| res := &pb.TaskQueueDeleteResponse{} |
| if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil { |
| return err |
| } |
| if a, b := len(req.TaskName), len(res.Result); a != b { |
| return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b) |
| } |
| me, any := make(appengine.MultiError, len(res.Result)), false |
| for i, ec := range res.Result { |
| if ec != pb.TaskQueueServiceError_OK { |
| me[i] = &internal.APIError{ |
| Service: "taskqueue", |
| Code: int32(ec), |
| } |
| any = true |
| } |
| } |
| if any { |
| return me |
| } |
| return nil |
| } |
| |
| func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) { |
| if queueName == "" { |
| queueName = "default" |
| } |
| req := &pb.TaskQueueQueryAndOwnTasksRequest{ |
| QueueName: []byte(queueName), |
| LeaseSeconds: proto.Float64(float64(leaseTime)), |
| MaxTasks: proto.Int64(int64(maxTasks)), |
| GroupByTag: proto.Bool(groupByTag), |
| Tag: tag, |
| } |
| res := &pb.TaskQueueQueryAndOwnTasksResponse{} |
| if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil { |
| return nil, err |
| } |
| tasks := make([]*Task, len(res.Task)) |
| for i, t := range res.Task { |
| tasks[i] = &Task{ |
| Payload: t.Body, |
| Name: string(t.TaskName), |
| Method: "PULL", |
| ETA: time.Unix(0, *t.EtaUsec*1e3), |
| RetryCount: *t.RetryCount, |
| Tag: string(t.Tag), |
| } |
| } |
| return tasks, nil |
| } |
| |
| // Lease leases tasks from a queue. |
| // leaseTime is in seconds. |
| // The number of tasks fetched will be at most maxTasks. |
| func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) { |
| return lease(c, maxTasks, queueName, leaseTime, false, nil) |
| } |
| |
| // LeaseByTag leases tasks from a queue, grouped by tag. |
| // If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA. |
| // leaseTime is in seconds. |
| // The number of tasks fetched will be at most maxTasks. |
| func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) { |
| return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag)) |
| } |
| |
| // Purge removes all tasks from a queue. |
| func Purge(c context.Context, queueName string) error { |
| if queueName == "" { |
| queueName = "default" |
| } |
| req := &pb.TaskQueuePurgeQueueRequest{ |
| QueueName: []byte(queueName), |
| } |
| res := &pb.TaskQueuePurgeQueueResponse{} |
| return internal.Call(c, "taskqueue", "PurgeQueue", req, res) |
| } |
| |
| // ModifyLease modifies the lease of a task. |
| // Used to request more processing time, or to abandon processing. |
| // leaseTime is in seconds and must not be negative. |
| func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error { |
| if queueName == "" { |
| queueName = "default" |
| } |
| req := &pb.TaskQueueModifyTaskLeaseRequest{ |
| QueueName: []byte(queueName), |
| TaskName: []byte(task.Name), |
| EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership. |
| LeaseSeconds: proto.Float64(float64(leaseTime)), |
| } |
| res := &pb.TaskQueueModifyTaskLeaseResponse{} |
| if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil { |
| return err |
| } |
| task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3) |
| return nil |
| } |
| |
| // QueueStatistics represents statistics about a single task queue. |
| type QueueStatistics struct { |
| Tasks int // may be an approximation |
| OldestETA time.Time // zero if there are no pending tasks |
| |
| Executed1Minute int // tasks executed in the last minute |
| InFlight int // tasks executing now |
| EnforcedRate float64 // requests per second |
| } |
| |
| // QueueStats retrieves statistics about queues. |
| func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) { |
| req := &pb.TaskQueueFetchQueueStatsRequest{ |
| QueueName: make([][]byte, len(queueNames)), |
| } |
| for i, q := range queueNames { |
| if q == "" { |
| q = "default" |
| } |
| req.QueueName[i] = []byte(q) |
| } |
| res := &pb.TaskQueueFetchQueueStatsResponse{} |
| if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil { |
| return nil, err |
| } |
| qs := make([]QueueStatistics, len(res.Queuestats)) |
| for i, qsg := range res.Queuestats { |
| qs[i] = QueueStatistics{ |
| Tasks: int(*qsg.NumTasks), |
| } |
| if eta := *qsg.OldestEtaUsec; eta > -1 { |
| qs[i].OldestETA = time.Unix(0, eta*1e3) |
| } |
| if si := qsg.ScannerInfo; si != nil { |
| qs[i].Executed1Minute = int(*si.ExecutedLastMinute) |
| qs[i].InFlight = int(si.GetRequestsInFlight()) |
| qs[i].EnforcedRate = si.GetEnforcedRate() |
| } |
| } |
| return qs, nil |
| } |
| |
| func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) { |
| x.Transaction = t |
| } |
| |
| func init() { |
| internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name) |
| |
| // Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue. |
| dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT) |
| internal.RegisterTimeoutErrorCode("taskqueue", dsCode) |
| |
| // Transaction registration. |
| internal.RegisterTransactionSetter(setTransaction) |
| internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) { |
| for _, req := range x.AddRequest { |
| setTransaction(req, t) |
| } |
| }) |
| } |