| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package jobqueue |
| |
| import ( |
| "bytes" |
| "fmt" |
| "os" |
| "os/exec" |
| "path" |
| "strconv" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "v.io/x/playground/lib/event" |
| ) |
| |
| var ( |
| defaultMaxTime = 10 * time.Second |
| defaultMaxSize = 1 << 16 |
| defaultMemLimit = 100 |
| ) |
| |
| func init() { |
| // Compile builder binary and put in path. |
| pgDir := os.ExpandEnv("${V23_ROOT}/release/projects/playground/go") |
| |
| cmd := exec.Command("make", "builder") |
| cmd.Dir = path.Join(pgDir, "src", "v.io", "x", "playground") |
| if out, err := cmd.CombinedOutput(); err != nil { |
| fmt.Println("Error running 'make builder'") |
| fmt.Println(string(out)) |
| panic(err) |
| } |
| |
| pgBinDir := path.Join(pgDir, "bin") |
| if err := os.Setenv("PATH", pgBinDir+":"+os.Getenv("PATH")); err != nil { |
| panic(err) |
| } |
| } |
| |
| // mockTestFile is a simple Go progam that will be sent to the dispatcher in the |
| // test runs. |
| var mockTestFile = ` |
| package main |
| |
| import( |
| "fmt" |
| "time" |
| ) |
| |
| func main() { |
| fmt.Printf("PROGRAM START") |
| time.Sleep(1 * time.Second) |
| fmt.Printf("PROGRAM END") |
| } |
| ` |
| |
| // mockTestBody is a bundle containing the mockTestFile, as it would look if it |
| // came from an http request from the playground client. |
| var mockTestBody = []byte(fmt.Sprintf(`{ |
| "files": [ { |
| "name": "src/main/main.go", |
| "body": %v |
| } ] |
| }`, strconv.Quote(mockTestFile))) |
| |
| // testConfig encapsulates all the parameters and expectations for a single |
| // test run. |
| type testConfig struct { |
| // Number of jobs to enqueue. |
| jobs int |
| |
| // Dispatcher configuration. |
| workers int |
| jobCap int |
| |
| // Job configuration. |
| useDocker bool |
| maxSize int |
| maxTime time.Duration |
| memLimit int |
| |
| // Test expectations. Default is to expect success. |
| expectEnqueueFail bool |
| expectOutputTooLarge bool |
| expectJobFail bool |
| } |
| |
| func newMockResponseEventSink() *event.ResponseEventSink { |
| var b bytes.Buffer |
| return event.NewResponseEventSink(&b, false) |
| } |
| |
| // eventsMatch returns true iff there is an event whose message matches the |
| // given string. |
| func eventsMatch(events []event.Event, match string) bool { |
| for _, e := range events { |
| if strings.Contains(e.Message, match) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // assertExpectedResult asserts that the result matches the test expectations |
| // in the test config. |
| func assertExpectedResult(t *testing.T, c testConfig, r Result) { |
| expectSuccess := !c.expectJobFail |
| if expectSuccess != r.Success { |
| t.Errorf("Expected result.Success to be %v but was %v. Test config: %#v", expectSuccess, r.Success, c) |
| } |
| |
| if !r.Success { |
| return |
| } |
| |
| if c.expectOutputTooLarge { |
| want := "Program output too large, killed." |
| if !eventsMatch(r.Events, want) { |
| t.Errorf("Event message %v not found in %#v. Test config: %#v", want, r.Events, c) |
| } |
| return |
| } |
| |
| // Expect normal program output. |
| want := "PROGRAM START" |
| if !eventsMatch(r.Events, want) { |
| t.Errorf("Event message %v not found in %#v. Test config: %#v", want, r.Events, c) |
| } |
| |
| want = "PROGRAM END" |
| if !eventsMatch(r.Events, want) { |
| t.Errorf("Event message %v not found in %#v. Test config: %#v", want, r.Events, c) |
| } |
| } |
| |
| // runTest runs a particular test configuration. It starts a dispatcher, queues |
| // jobs and waits for them to finish, and asserts that they match the |
| // expectations in the testConfig. |
| func runTest(t *testing.T, c testConfig) { |
| fmt.Printf("Testing %v jobs on %v workers with jobCap of %v and useDocker %v\n", c.jobs, c.workers, c.jobCap, c.useDocker) |
| d := NewDispatcher(c.workers, c.jobCap) |
| |
| var enqueueError error |
| |
| // wg waits for all the jobs to finish. |
| var wg sync.WaitGroup |
| |
| // Start all the jobs. |
| for i := 0; i < c.jobs; i++ { |
| res := newMockResponseEventSink() |
| job := NewJob(mockTestBody, res, c.maxSize, c.maxTime, c.useDocker, c.memLimit) |
| |
| resultChan, err := d.Enqueue(job) |
| if err != nil { |
| enqueueError = err |
| break |
| } |
| |
| wg.Add(1) |
| |
| go func() { |
| r := <-resultChan |
| assertExpectedResult(t, c, r) |
| wg.Done() |
| }() |
| } |
| |
| if !c.expectEnqueueFail && enqueueError != nil { |
| t.Fatalf("Enqueue failed: %v. Test config: %#v", enqueueError, c) |
| } |
| |
| if c.expectEnqueueFail && enqueueError == nil { |
| t.Fatalf("Expected Enqueue to fail but it did not. Test config: %#v", c) |
| } |
| |
| // Test must finish in 30 seconds. |
| timeout := time.After(30 * time.Second) |
| |
| jobsFinished := make(chan bool) |
| |
| go func() { |
| wg.Wait() |
| jobsFinished <- true |
| }() |
| |
| // Wait for timeout or all jobs to finish. |
| select { |
| case <-timeout: |
| t.Fatalf("Expected jobs to complete but got timeout. Test config: %#v", c) |
| case <-jobsFinished: |
| } |
| |
| d.Stop() |
| } |
| |
| func TestJobQueue(t *testing.T) { |
| |
| // Test success cases without docker. |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 3, |
| workers: 1, |
| jobCap: 3, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 3, |
| workers: 3, |
| jobCap: 3, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 6, |
| workers: 3, |
| jobCap: 10, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| }) |
| |
| // Test success cases with docker. |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 3, |
| workers: 1, |
| jobCap: 3, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 3, |
| workers: 3, |
| jobCap: 3, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 6, |
| workers: 3, |
| jobCap: 10, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| }) |
| |
| // Test Enqueue should fail when job capacity is exceeded. |
| runTest(t, testConfig{ |
| jobs: 5, |
| workers: 2, |
| jobCap: 2, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| expectEnqueueFail: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 5, |
| workers: 2, |
| jobCap: 2, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| expectEnqueueFail: true, |
| }) |
| |
| // Test job should fail if it exceeds size limit. |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: 100, |
| maxTime: defaultMaxTime, |
| useDocker: false, |
| expectOutputTooLarge: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: 100, |
| maxTime: defaultMaxTime, |
| memLimit: defaultMemLimit, |
| useDocker: true, |
| expectOutputTooLarge: true, |
| }) |
| |
| // Test job should fail if it exceeds max time. |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: defaultMaxSize, |
| maxTime: 1 * time.Second, |
| useDocker: false, |
| expectJobFail: true, |
| }) |
| |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: defaultMaxSize, |
| maxTime: 1 * time.Second, |
| useDocker: true, |
| expectJobFail: true, |
| }) |
| |
| // Test job should fail if it exceeds memory limit (docker only). |
| runTest(t, testConfig{ |
| jobs: 1, |
| workers: 1, |
| jobCap: 1, |
| maxSize: defaultMaxSize, |
| maxTime: defaultMaxTime, |
| memLimit: 5, |
| useDocker: true, |
| expectJobFail: true, |
| }) |
| } |
| |
| func TestJobCancel(t *testing.T) { |
| d := NewDispatcher(1, 10) |
| |
| // Create five jobs. |
| res1 := newMockResponseEventSink() |
| job1 := NewJob(mockTestBody, res1, defaultMaxSize, defaultMaxTime, false, defaultMemLimit) |
| |
| res2 := newMockResponseEventSink() |
| job2 := NewJob(mockTestBody, res2, defaultMaxSize, defaultMaxTime, false, defaultMemLimit) |
| |
| res3 := newMockResponseEventSink() |
| job3 := NewJob(mockTestBody, res3, defaultMaxSize, defaultMaxTime, false, defaultMemLimit) |
| |
| res4 := newMockResponseEventSink() |
| job4 := NewJob(mockTestBody, res4, defaultMaxSize, defaultMaxTime, false, defaultMemLimit) |
| |
| res5 := newMockResponseEventSink() |
| job5 := NewJob(mockTestBody, res5, defaultMaxSize, defaultMaxTime, false, defaultMemLimit) |
| |
| // Cancel first job right away. |
| job1.Cancel() |
| |
| // Queue all jobs. |
| resultChan1, err := d.Enqueue(job1) |
| if err != nil { |
| t.Fatalf("Enqueue failed: %v", err) |
| } |
| resultChan2, err := d.Enqueue(job2) |
| if err != nil { |
| t.Fatalf("Enqueue failed: %v", err) |
| } |
| resultChan3, err := d.Enqueue(job3) |
| if err != nil { |
| t.Fatalf("Enqueue failed: %v", err) |
| } |
| resultChan4, err := d.Enqueue(job4) |
| if err != nil { |
| t.Fatalf("Enqueue failed: %v", err) |
| } |
| resultChan5, err := d.Enqueue(job5) |
| if err != nil { |
| t.Fatalf("Enqueue failed: %v", err) |
| } |
| |
| r1 := <-resultChan1 |
| r2 := <-resultChan2 |
| |
| // Cancel second job after it has already run. |
| job2.Cancel() |
| |
| // Cancel fourth job after it has been queued, but before it has run. |
| job4.Cancel() |
| |
| r3 := <-resultChan3 |
| r4 := <-resultChan4 |
| |
| // Wait half a second to give job 5 a chance to start running, then cancel it. |
| time.Sleep(500 * time.Millisecond) |
| job5.Cancel() |
| |
| r5 := <-resultChan5 |
| |
| // Check that jobs 1 and 4 failed. |
| if r1.Success { |
| t.Errorf("Expected job 1 to fail but it succeeded.") |
| } |
| if r4.Success { |
| t.Errorf("Expected job 4 to fail but it succeeded.") |
| } |
| |
| // Check that jobs 2, 3, and 5 succeeded. |
| if !r2.Success { |
| t.Errorf("Expected job 2 to succeed but it failed.") |
| } |
| if !r3.Success { |
| t.Errorf("Expected job 3 to succeed but it failed.") |
| } |
| if !r5.Success { |
| t.Errorf("Expected job 5 to succeed but it failed.") |
| } |
| } |