Tests for /compile route.
These test:
* That bad requests are rejected.
* Successful test runs are cached.
* Failed test runs are not cached.
I had to mock out the Dispatcher, so some changes were made in jobqueue
to make Dispatcher an interface, and expose some of the types.
Change-Id: I43da94862e8917c5cb55707d80613a391e7fb5a6
diff --git a/go/src/playground/compilerd/compile.go b/go/src/playground/compilerd/compile.go
index 217b15a..a63a549 100644
--- a/go/src/playground/compilerd/compile.go
+++ b/go/src/playground/compilerd/compile.go
@@ -31,9 +31,6 @@
// perhaps be optimized.
cache = lru.New(10000)
- // dispatcher schedules jobs to be run on a fixed number of workers.
- dispatcher *jobqueue.Dispatcher
-
useDocker = flag.Bool("use-docker", true, "Whether to use Docker to run builder; if false, we run the builder directly.")
// TODO(nlacasse): Experiment with different values for parallelism and
@@ -60,13 +57,22 @@
Events []event.Event
}
-func startDispatcher() {
- dispatcher = jobqueue.NewDispatcher(*parallelism, *jobQueueCap)
+// compiler handles compile requests by enqueuing them on the dispatcher's
+// work queue.
+type compiler struct {
+ dispatcher jobqueue.Dispatcher
+}
+
+// newCompiler creates a new compiler.
+func newCompiler() *compiler {
+ return &compiler{
+ dispatcher: jobqueue.NewDispatcher(*parallelism, *jobQueueCap),
+ }
}
// POST request that returns cached results if any exist, otherwise schedules
// the bundle to be run and caches the results.
-func handlerCompile(w http.ResponseWriter, r *http.Request) {
+func (c *compiler) handlerCompile(w http.ResponseWriter, r *http.Request) {
if !handleCORS(w, r) {
return
}
@@ -127,14 +133,19 @@
// Create a new compile job and queue it.
job := jobqueue.NewJob(requestBody, res, *maxSize, *maxTime, *useDocker, dockerMem)
- resultChan, err := dispatcher.Enqueue(job)
+ resultChan, err := c.dispatcher.Enqueue(job)
if err != nil {
// TODO(nlacasse): This should send a StatusServiceUnavailable, not a StatusOK.
res.Write(event.New("", "stderr", "Service busy. Please try again later."))
return
}
- clientDisconnect := w.(http.CloseNotifier).CloseNotify()
+ // Go's httptest.NewRecorder does not support http.CloseNotifier, so we
+ // can't assume that w.(httpCloseNotifier) will succeed.
+ var clientDisconnect <-chan bool
+ if closeNotifier, ok := w.(http.CloseNotifier); ok {
+ clientDisconnect = closeNotifier.CloseNotify()
+ }
// Wait for job to finish and cache results if job succeeded.
// We do this in a for loop because we always need to wait for the result
@@ -162,3 +173,9 @@
}
}
}
+
+// stop waits for any in-progress jobs to finish, and cancels any jobs that
+// have not started running yet.
+func (c *compiler) stop() {
+ c.dispatcher.Stop()
+}
diff --git a/go/src/playground/compilerd/compile_test.go b/go/src/playground/compilerd/compile_test.go
new file mode 100644
index 0000000..040b230
--- /dev/null
+++ b/go/src/playground/compilerd/compile_test.go
@@ -0,0 +1,177 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "playground/compilerd/jobqueue"
+ "playground/lib/event"
+)
+
+// mockDispatcher implements the jobqueue.Dispatcher interface.
+type mockDispatcher struct {
+ jobs []*jobqueue.Job
+ sendSuccess bool
+}
+
+// Enqueue responds to every job after 100ms. The only event message will
+// contain the job body. The result will be a success if "sendSuccess" is true.
+func (d *mockDispatcher) Enqueue(j *jobqueue.Job) (chan jobqueue.Result, error) {
+ d.jobs = append(d.jobs, j)
+
+ e := event.Event{
+ Message: string(j.Body()),
+ }
+
+ result := jobqueue.Result{
+ Success: d.sendSuccess,
+ Events: []event.Event{e},
+ }
+
+ resultChan := make(chan jobqueue.Result)
+
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ resultChan <- result
+ }()
+
+ return resultChan, nil
+}
+
+func (d *mockDispatcher) Stop() {
+}
+
+var _ = jobqueue.Dispatcher((*mockDispatcher)(nil))
+
+func sendCompileRequest(c *compiler, method string, body io.Reader) *httptest.ResponseRecorder {
+ path := "/compile"
+ req, err := http.NewRequest(method, path, body)
+ if err != nil {
+ panic(err)
+ }
+
+ w := httptest.NewRecorder()
+ c.handlerCompile(w, req)
+ return w
+}
+
+func TestNonPostMethodsAreBadRequests(t *testing.T) {
+ c := &compiler{}
+ body := bytes.NewBufferString("foobar")
+ methods := []string{"GET", "PUT", "DELETE", "FOOBAR"}
+ for _, method := range methods {
+ w := sendCompileRequest(c, method, body)
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("Expected HTTP method %v to result in status %v but got %v", method, http.StatusBadRequest, w.Code)
+ }
+ }
+}
+
+func TestEmptyBodyIsBadRequest(t *testing.T) {
+ c := &compiler{}
+ w := sendCompileRequest(c, "POST", nil)
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("Expected POST with empty body to result in status %v but got %v", http.StatusBadRequest, w.Code)
+ }
+}
+
+func TestLargeBodyIsBadRequest(t *testing.T) {
+ c := &compiler{}
+ body := bytes.NewBuffer(make([]byte, *maxSize+1))
+ w := sendCompileRequest(c, "POST", body)
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("Expected POST with large body to result in status %v but got %v", http.StatusBadRequest, w.Code)
+ }
+}
+
+func TestSuccessResultsAreCached(t *testing.T) {
+ dispatcher := &mockDispatcher{
+ sendSuccess: true,
+ }
+ c := &compiler{
+ dispatcher: dispatcher,
+ }
+
+ bodyString := "foobar"
+ body := bytes.NewBufferString(bodyString)
+ bodyBytes := body.Bytes()
+ requestBodyHash := rawHash(bodyBytes)
+
+ // Check that body is not already in cache.
+ if _, ok := cache.Get(requestBodyHash); ok {
+ t.Errorf("Expected request body not to be in cache, but it was.")
+ }
+
+ w := sendCompileRequest(c, "POST", body)
+ if w.Code != http.StatusOK {
+ t.Errorf("Expected POST with body %v to result in status %v but got %v", bodyString, http.StatusOK, w.Code)
+ }
+
+ // Check that the dispatcher queued the job.
+ if len(dispatcher.jobs) != 1 {
+ t.Errorf("Expected len(dispatcher.jobs) to be 1 but got %v", len(dispatcher.jobs))
+ }
+
+ // Check that body is now in the cache.
+ if cr, ok := cache.Get(requestBodyHash); !ok {
+ t.Errorf("Expected request body to be in cache, but it was not.")
+ } else {
+ cachedResponseStruct := cr.(cachedResponse)
+ if cachedResponseStruct.Status != http.StatusOK {
+ t.Errorf("Expected cached result status to be %v but got %v", http.StatusOK, cachedResponseStruct.Status)
+ }
+ want := string(bodyBytes)
+ if len(cachedResponseStruct.Events) != 1 || cachedResponseStruct.Events[0].Message != want {
+ t.Errorf("Expected cached result body to contain single event with message %v but got %v", want, cachedResponseStruct.Events)
+
+ }
+
+ // Check that the dispatcher did not queue the second request, since it was in the cache.
+ if len(dispatcher.jobs) != 1 {
+ t.Errorf("Expected len(dispatcher.jobs) to be 1 but got %v", len(dispatcher.jobs))
+ }
+ }
+}
+
+func TestErrorResultsAreNotCached(t *testing.T) {
+ dispatcher := &mockDispatcher{
+ sendSuccess: false,
+ }
+
+ c := &compiler{
+ dispatcher: dispatcher,
+ }
+
+ bodyString := "bazbar"
+ body := bytes.NewBufferString(bodyString)
+ bodyBytes := body.Bytes()
+ requestBodyHash := rawHash(bodyBytes)
+
+ // Check that body is not already in cache.
+ if _, ok := cache.Get(requestBodyHash); ok {
+ t.Errorf("Expected request body not to be in cache, but it was.")
+ }
+
+ w := sendCompileRequest(c, "POST", body)
+ if w.Code != http.StatusOK {
+ t.Errorf("Expected POST with body %v to result in status %v but got %v", bodyString, http.StatusOK, w.Code)
+ }
+
+ // Check that the dispatcher queued the request.
+ if len(dispatcher.jobs) != 1 {
+ t.Errorf("Expected len(dispatcher.jobs) to be 1 but got %v", len(dispatcher.jobs))
+ }
+
+ // Check that body is still not in the cache.
+ if _, ok := cache.Get(requestBodyHash); ok {
+ t.Errorf("Expected request body not to be in cache, but it was.")
+ }
+}
diff --git a/go/src/playground/compilerd/jobqueue/jobqueue.go b/go/src/playground/compilerd/jobqueue/jobqueue.go
index 9c3a195..553e27d 100644
--- a/go/src/playground/compilerd/jobqueue/jobqueue.go
+++ b/go/src/playground/compilerd/jobqueue/jobqueue.go
@@ -20,6 +20,12 @@
// reads a worker off the worker queue, and then reads a job off the job
// queue, and runs that job on that worker. When the job finishes, the worker
// is pushed back on to the worker queue.
+//
+// TODO(nlacasse): There are many types and functions exported in this file
+// which are only exported because they are used by the compile test, in
+// particular Job, Dispatcher, and Result types, and their constructors and
+// methods. Consider refactoring the compiler so those tests and the logic
+// they test become part of this package.
package jobqueue
@@ -52,11 +58,11 @@
}()
}
-type job struct {
+type Job struct {
id string
body []byte
res *event.ResponseEventSink
- resultChan chan result
+ resultChan chan Result
maxSize int
maxTime time.Duration
@@ -67,8 +73,8 @@
cancelled bool
}
-func NewJob(body []byte, res *event.ResponseEventSink, maxSize int, maxTime time.Duration, useDocker bool, dockerMem int) *job {
- return &job{
+func NewJob(body []byte, res *event.ResponseEventSink, maxSize int, maxTime time.Duration, useDocker bool, dockerMem int) *Job {
+ return &Job{
id: <-uniq,
body: body,
res: res,
@@ -79,21 +85,33 @@
// resultChan has capacity 1 so that writing to the channel won't block
// if nobody ever reads the result.
- resultChan: make(chan result, 1),
+ resultChan: make(chan Result, 1),
}
}
+// Body is a getter for Job.body.
+func (j *Job) Body() []byte {
+ return j.body
+}
+
// Cancel will prevent the job from being run, if it has not already been
// started by a worker.
-func (j *job) Cancel() {
+func (j *Job) Cancel() {
j.mu.Lock()
defer j.mu.Unlock()
log.Printf("Cancelling job %v.\n", j.id)
j.cancelled = true
}
-type Dispatcher struct {
- jobQueue chan *job
+// Dispatcher is an interface type so it can be mocked during tests.
+type Dispatcher interface {
+ Enqueue(j *Job) (chan Result, error)
+ Stop()
+}
+
+// dispatcherImpl implements Dispatcher interface.
+type dispatcherImpl struct {
+ jobQueue chan *Job
// A message sent on the stopped channel causes the dispatcher to stop
// assigning new jobs to workers.
@@ -104,9 +122,11 @@
wg sync.WaitGroup
}
-func NewDispatcher(workers int, jobQueueCap int) *Dispatcher {
- d := &Dispatcher{
- jobQueue: make(chan *job, jobQueueCap),
+var _ = Dispatcher((*dispatcherImpl)(nil))
+
+func NewDispatcher(workers int, jobQueueCap int) Dispatcher {
+ d := &dispatcherImpl{
+ jobQueue: make(chan *Job, jobQueueCap),
stopped: make(chan bool),
}
@@ -116,7 +136,7 @@
// start starts a given number of workers, then reads from the jobQueue and
// assigns jobs to free workers.
-func (d *Dispatcher) start(num int) {
+func (d *dispatcherImpl) start(num int) {
log.Printf("Dispatcher starting %d workers.\n", num)
// Workers are published on the workerQueue when they are free.
@@ -147,7 +167,7 @@
job.mu.Unlock()
if cancelled {
log.Printf("Dispatcher encountered cancelled job %v, rejecting.\n", job.id)
- job.resultChan <- result{
+ job.resultChan <- Result{
Success: false,
Events: nil,
}
@@ -173,7 +193,7 @@
select {
case job := <-d.jobQueue:
log.Printf("Dispatcher is stopped, rejecting job %v.\n", job.id)
- job.resultChan <- result{
+ job.resultChan <- Result{
Success: false,
Events: nil,
}
@@ -192,7 +212,7 @@
// TODO(nlacasse): Consider letting the dispatcher run all currently queued
// jobs, rather than rejecting them. Or, put logic in the client to retry
// cancelled jobs.
-func (d *Dispatcher) Stop() {
+func (d *dispatcherImpl) Stop() {
log.Printf("Stopping dispatcher.\n")
d.stopped <- true
@@ -202,7 +222,7 @@
// Enqueue queues a job to be run be the next available worker. It returns a
// channel on which the job's results will be published.
-func (d *Dispatcher) Enqueue(j *job) (chan result, error) {
+func (d *dispatcherImpl) Enqueue(j *Job) (chan Result, error) {
select {
case d.jobQueue <- j:
return j.resultChan, nil
@@ -211,7 +231,7 @@
}
}
-type result struct {
+type Result struct {
Success bool
Events []event.Event
}
@@ -228,7 +248,7 @@
// run compiles and runs a job, caches the result, and returns the result on
// the job's result channel.
-func (w *worker) run(j *job) result {
+func (w *worker) run(j *Job) Result {
event.Debug(j.res, "Preparing to run program")
memoryFlag := fmt.Sprintf("%dm", j.dockerMem)
@@ -379,12 +399,12 @@
// TODO(sadovsky): This policy is helpful for development, but may not be wise
// for production. Revisit.
if !timedOut && !erroredOut {
- return result{
+ return Result{
Success: true,
Events: j.res.PopWrittenEvents(),
}
} else {
- return result{
+ return Result{
Success: false,
Events: nil,
}
diff --git a/go/src/playground/compilerd/jobqueue/jobqueue_test.go b/go/src/playground/compilerd/jobqueue/jobqueue_test.go
index e00877a..f5bfd44 100644
--- a/go/src/playground/compilerd/jobqueue/jobqueue_test.go
+++ b/go/src/playground/compilerd/jobqueue/jobqueue_test.go
@@ -109,7 +109,7 @@
// assertExpectedResult asserts that the result matches the test expectations
// in the test config.
-func assertExpectedResult(t *testing.T, c testConfig, r result) {
+func assertExpectedResult(t *testing.T, c testConfig, r Result) {
expectSuccess := !c.expectJobFail
if expectSuccess != r.Success {
t.Errorf("Expected result.Success to be %v but was %v. Test config: %#v", expectSuccess, r.Success, c)
diff --git a/go/src/playground/compilerd/main.go b/go/src/playground/compilerd/main.go
index 3e01795..a47adab 100644
--- a/go/src/playground/compilerd/main.go
+++ b/go/src/playground/compilerd/main.go
@@ -77,6 +77,8 @@
panic(err)
}
+ c := newCompiler()
+
listenForNs := listenTimeout.Nanoseconds()
if listenForNs > 0 {
delayNs := listenForNs/2 + rand.Int63n(listenForNs/2)
@@ -85,16 +87,14 @@
// damage. We want to exit cleanly before then so we don't cause requests
// to fail. When compilerd exits, a watchdog will shut the machine down
// after a short delay.
- go waitForExit(time.Nanosecond * time.Duration(delayNs))
+ go waitForExit(c, time.Nanosecond*time.Duration(delayNs))
}
if err := initDBHandles(); err != nil {
log.Fatal(err)
}
- startDispatcher()
-
- http.HandleFunc("/compile", handlerCompile)
+ http.HandleFunc("/compile", c.handlerCompile)
http.HandleFunc("/load", handlerLoad)
http.HandleFunc("/save", handlerSave)
http.HandleFunc("/healthz", handlerHealthz)
@@ -105,7 +105,7 @@
}
}
-func waitForExit(limit time.Duration) {
+func waitForExit(c *compiler, limit time.Duration) {
// Exit if we get a SIGTERM.
term := make(chan os.Signal, 1)
signal.Notify(term, syscall.SIGTERM)
@@ -136,8 +136,8 @@
}
}()
- // Stop the dispatcher and wait for all in-progress jobs to finish.
- dispatcher.Stop()
+ // Stop the compiler and wait for all in-progress jobs to finish.
+ c.stop()
// Give the server some extra time to send any remaning responses that are
// queued to be sent.