Merge "client/lib/shell.sh: VEYRON_* environment variables are now V23_*"
diff --git a/client/Makefile b/client/Makefile
index d86ce20..3e46715 100644
--- a/client/Makefile
+++ b/client/Makefile
@@ -21,7 +21,7 @@
deploy-staging: all
git rev-parse --verify HEAD > public/version
gcloud config set project vanadium-staging
- gsutil -m rsync -d -r public gs://staging.playground.v.io
+ gsutil -m rsync -d -r public gs://playground.staging.v.io
public/bundle.js: browser/index.js $(js_files) node_modules
browserify --debug $< 1> $@
diff --git a/go/src/playground/Makefile b/go/src/playground/Makefile
index b4f2814..4fabbb8 100644
--- a/go/src/playground/Makefile
+++ b/go/src/playground/Makefile
@@ -19,7 +19,7 @@
compilerd \
--sqlconf=$< \
--setupdb=true \
- --listenTimeout=0 \
+ --listen-timeout=0 \
--address=$(host):$(port) \
--use-docker=false
diff --git a/go/src/playground/README.md b/go/src/playground/README.md
index a49e371..0cab349 100644
--- a/go/src/playground/README.md
+++ b/go/src/playground/README.md
@@ -53,12 +53,12 @@
Run the compiler binary:
- $ $VANADIUM_ROOT/release/projects/playground/go/bin/compilerd --listenTimeout=0 --address=localhost:8181
+ $ $VANADIUM_ROOT/release/projects/playground/go/bin/compilerd --listen-timeout=0 --address=localhost:8181
Or, run it without Docker (for faster iterations during development):
$ cd $(mktemp -d "/tmp/XXXXXXXX")
- $ PATH=$VANADIUM_ROOT/release/go/bin:$VANADIUM_ROOT/release/projects/playground/go/bin:$PATH compilerd --listenTimeout=0 --address=localhost:8181 --use-docker=false
+ $ PATH=$VANADIUM_ROOT/release/go/bin:$VANADIUM_ROOT/release/projects/playground/go/bin:$PATH compilerd --listen-timeout=0 --address=localhost:8181 --use-docker=false
The server should now be running at http://localhost:8181 and responding to
compile requests at http://localhost:8181/compile.
diff --git a/go/src/playground/compilerd/compile.go b/go/src/playground/compilerd/compile.go
index 7a164d0..217b15a 100644
--- a/go/src/playground/compilerd/compile.go
+++ b/go/src/playground/compilerd/compile.go
@@ -12,43 +12,60 @@
package main
import (
- "bufio"
- "bytes"
- "encoding/json"
"flag"
- "fmt"
- "io"
"log"
"net/http"
- "os/exec"
- "sync"
"time"
"github.com/golang/groupcache/lru"
+ "playground/compilerd/jobqueue"
"playground/lib"
"playground/lib/event"
)
var (
+ // In-memory LRU cache of request/response bodies. Keys are sha256 sums of
+ // request bodies (32 bytes each), values are of type cachedResponse.
+ // NOTE(nlacasse): The cache size (10k) was chosen arbitrarily and should
+ // perhaps be optimized.
+ cache = lru.New(10000)
+
+ // dispatcher schedules jobs to be run on a fixed number of workers.
+ dispatcher *jobqueue.Dispatcher
+
useDocker = flag.Bool("use-docker", true, "Whether to use Docker to run builder; if false, we run the builder directly.")
+ // TODO(nlacasse): Experiment with different values for parallelism and
+ // dockerMemLimit once we have performance testing.
+ // There should be some memory left over for the system, http server, and
+ // docker daemon. The GCE n1-standard machines have 3.75GB of RAM, so the
+ // default value below should leave plenty of room.
+ parallelism = flag.Int("parallelism", 5, "Maximum number of builds to run in parallel.")
+ dockerMemLimit = flag.Int("total-docker-memory", 3000, "Total memory limit for all Docker build instances in MB.")
+
// Arbitrary deadline (enough to compile, run, shutdown).
// TODO(sadovsky): For now this is set high to avoid spurious timeouts.
// Playground execution speed needs to be optimized.
- maxTime = 10 * time.Second
+ maxTime = flag.Duration("max-time", 10*time.Second, "Maximum time for build to run.")
- // In-memory LRU cache of request/response bodies. Keys are sha256 sums of
- // request bodies (32 bytes each), values are of type CachedResponse.
- // NOTE(nlacasse): The cache size (10k) was chosen arbitrarily and should
- // perhaps be optimized.
- cache = lru.New(10000)
+ // TODO(nlacasse): The default value of 100 was chosen arbitrarily and
+ // should be tuned.
+ jobQueueCap = flag.Int("job-queue-capacity", 100, "Maximum number of jobs to allow in the job queue. Attempting to add a new job will fail if the queue is full.")
)
-//////////////////////////////////////////
-// HTTP request handler
+// cachedResponse is the type of values stored in the lru cache.
+type cachedResponse struct {
+ Status int
+ Events []event.Event
+}
-// POST request that compiles and runs the bundle and streams output to client.
+func startDispatcher() {
+ dispatcher = jobqueue.NewDispatcher(*parallelism, *jobQueueCap)
+}
+
+// POST request that returns cached results if any exist, otherwise schedules
+// the bundle to be run and caches the results.
func handlerCompile(w http.ResponseWriter, r *http.Request) {
if !handleCORS(w, r) {
return
@@ -57,7 +74,7 @@
// Check method and read POST body.
// Limit is set to maxSize+1 to allow distinguishing between exactly maxSize
// and larger than maxSize requests.
- requestBody := getPostBody(w, r, maxSize+1)
+ requestBody := getPostBody(w, r, *maxSize+1)
if requestBody == nil {
return
}
@@ -67,18 +84,18 @@
// sensitive information, so guarding with a query parameter is sufficient.
wantDebug := r.FormValue("debug") == "1"
- openResponse := func(status int) *responseEventSink {
+ openResponse := func(status int) *event.ResponseEventSink {
w.Header().Add("Content-Type", "application/json")
// No Content-Length, using chunked encoding.
w.WriteHeader(status)
// The response is hard limited to 2*maxSize: maxSize for builder stdout,
// and another maxSize for compilerd error and status messages.
- return newResponseEventSink(lib.NewLimitedWriter(w, 2*maxSize, lib.DoOnce(func() {
+ return event.NewResponseEventSink(lib.NewLimitedWriter(w, 2*(*maxSize), lib.DoOnce(func() {
log.Println("Hard response size limit reached.")
})), !wantDebug)
}
- if len(requestBody) > maxSize {
+ if len(requestBody) > *maxSize {
res := openResponse(http.StatusBadRequest)
res.Write(event.New("", "stderr", "Program too large."))
return
@@ -89,251 +106,59 @@
// NOTE(sadovsky): In the client we may shift timestamps (based on current
// time) and introduce a fake delay.
requestBodyHash := rawHash(requestBody)
- if cachedResponse, ok := cache.Get(requestBodyHash); ok {
- if cachedResponseStruct, ok := cachedResponse.(CachedResponse); ok {
+ if cr, ok := cache.Get(requestBodyHash); ok {
+ if cachedResponseStruct, ok := cr.(cachedResponse); ok {
res := openResponse(cachedResponseStruct.Status)
event.Debug(res, "Sending cached response")
res.Write(cachedResponseStruct.Events...)
return
} else {
- log.Panicf("Invalid cached response: %v\n", cachedResponse)
+ log.Panicf("Invalid cached response: %v\n", cr)
}
}
res := openResponse(http.StatusOK)
- id := <-uniq
-
- event.Debug(res, "Preparing to run program")
-
- // TODO(sadovsky): Set runtime constraints on CPU and memory usage.
- // http://docs.docker.com/reference/run/#runtime-constraints-on-cpu-and-memory
- var cmd *exec.Cmd
- if *useDocker {
- cmd = docker("run", "-i", "--name", id, "playground")
- } else {
- cmd = exec.Command("builder")
- }
- cmdKill := lib.DoOnce(func() {
- event.Debug(res, "Killing program")
- // The docker client can get in a state where stopping/killing/rm-ing
- // the container will not kill the client. The opposite should work
- // correctly (killing the docker client stops the container).
- // If not, the docker rm call below will.
- // Note, this wouldn't be sufficient if docker was called through sudo
- // since sudo doesn't pass sigkill to child processes.
- cmd.Process.Kill()
- })
-
- cmd.Stdin = bytes.NewReader(requestBody)
-
- // Builder will return all normal output as JSON Events on stdout, and will
- // return unexpected errors on stderr.
- // TODO(sadovsky): Security issue: what happens if the program output is huge?
- // We can restrict memory use of the Docker container, but these buffers are
- // outside Docker.
- // TODO(ivanpi): Revisit above comment.
- sizedOut := false
- erroredOut := false
-
- userLimitCallback := func() {
- sizedOut = true
- cmdKill()
- }
- systemLimitCallback := func() {
- erroredOut = true
- cmdKill()
- }
- userErrorCallback := func(err error) {
- // A relay error can result from unparseable JSON caused by a builder bug
- // or a malicious exploit inside Docker. Panicking could lead to a DoS.
- log.Println(id, "builder stdout relay error:", err)
- erroredOut = true
- cmdKill()
+ // Calculate memory limit for the docker instance running this job.
+ dockerMem := *dockerMemLimit
+ if *parallelism > 0 {
+ dockerMem /= *parallelism
}
- outRelay, outStop := limitedEventRelay(res, maxSize, userLimitCallback, userErrorCallback)
- // Builder stdout should already contain a JSON Event stream.
- cmd.Stdout = outRelay
-
- // Any stderr is unexpected, most likely a bug (panic) in builder, but could
- // also result from a malicious exploit inside Docker.
- // It is quietly logged as long as it doesn't exceed maxSize.
- errBuffer := new(bytes.Buffer)
- cmd.Stderr = lib.NewLimitedWriter(errBuffer, maxSize, systemLimitCallback)
-
- event.Debug(res, "Running program")
-
- timeout := time.After(maxTime)
- // User code execution is time limited in builder.
- // This flag signals only unexpected timeouts. maxTime should be sufficient
- // for end-to-end request processing by builder for worst-case user input.
- // TODO(ivanpi): builder doesn't currently time compilation, so builder
- // worst-case execution time is not clearly bounded.
- timedOut := false
-
- exit := make(chan error)
- go func() { exit <- cmd.Run() }()
-
- select {
- case err := <-exit:
- if err != nil && !sizedOut {
- erroredOut = true
- }
- case <-timeout:
- timedOut = true
- cmdKill()
- <-exit
+ // Create a new compile job and queue it.
+ job := jobqueue.NewJob(requestBody, res, *maxSize, *maxTime, *useDocker, dockerMem)
+ resultChan, err := dispatcher.Enqueue(job)
+ if err != nil {
+ // TODO(nlacasse): This should send a StatusServiceUnavailable, not a StatusOK.
+ res.Write(event.New("", "stderr", "Service busy. Please try again later."))
+ return
}
- // Close and wait for the output relay.
- outStop()
+ clientDisconnect := w.(http.CloseNotifier).CloseNotify()
- event.Debug(res, "Program exited")
-
- // Return the appropriate error message to the client.
- if timedOut {
- res.Write(event.New("", "stderr", "Internal timeout, please retry."))
- } else if erroredOut {
- res.Write(event.New("", "stderr", "Internal error, please retry."))
- } else if sizedOut {
- res.Write(event.New("", "stderr", "Program output too large, killed."))
- }
-
- // Log builder internal errors, if any.
- // TODO(ivanpi): Prevent caching? Report to client if debug requested?
- if errBuffer.Len() > 0 {
- log.Println(id, "builder stderr:", errBuffer.String())
- }
-
- event.Debug(res, "Response finished")
-
- // If we timed out or errored out, do not cache anything.
- // TODO(sadovsky): This policy is helpful for development, but may not be wise
- // for production. Revisit.
- if !timedOut && !erroredOut {
- cache.Add(requestBodyHash, CachedResponse{
- Status: http.StatusOK,
- Events: res.popWrittenEvents(),
- })
- event.Debug(res, "Caching response")
- } else {
- event.Debug(res, "Internal errors encountered, not caching response")
- }
-
- // TODO(nlacasse): This "docker rm" can be slow (several seconds), and seems
- // to block other Docker commands, thereby slowing down other concurrent
- // requests. We should figure out how to make it not block other Docker
- // commands. Setting GOMAXPROCS may or may not help.
- // See: https://github.com/docker/docker/issues/6480
- if *useDocker {
- go func() {
- docker("rm", "-f", id).Run()
- }()
- }
-}
-
-//////////////////////////////////////////
-// Event write and cache support
-
-type CachedResponse struct {
- Status int
- Events []event.Event
-}
-
-// Each line written to the returned writer, up to limit bytes total, is parsed
-// into an Event and written to Sink.
-// If the limit is reached or an invalid line read, the corresponding callback
-// is called and the relay stopped.
-// The returned stop() function stops the relaying.
-func limitedEventRelay(sink event.Sink, limit int, limitCallback func(), errorCallback func(err error)) (writer io.Writer, stop func()) {
- pipeReader, pipeWriter := io.Pipe()
- done := make(chan bool)
- stop = lib.DoOnce(func() {
- // Closing the pipe will cause the main relay loop to stop reading (EOF).
- // Writes will fail with ErrClosedPipe.
- pipeReader.Close()
- pipeWriter.Close()
- // Wait for the relay goroutine to finish.
- <-done
- })
- writer = lib.NewLimitedWriter(pipeWriter, limit, func() {
- limitCallback()
- stop()
- })
- go func() {
- bufr := bufio.NewReaderSize(pipeReader, limit)
- var line []byte
- var err error
- // Relay complete lines (events) until EOF or a read error is encountered.
- for line, err = bufr.ReadBytes('\n'); err == nil; line, err = bufr.ReadBytes('\n') {
- var e event.Event
- err = json.Unmarshal(line, &e)
- if err != nil {
- err = fmt.Errorf("failed unmarshalling event: %q", line)
- break
+ // Wait for job to finish and cache results if job succeeded.
+ // We do this in a for loop because we always need to wait for the result
+ // before closing the http handler, since Go panics when writing to the
+ // response after the handler has exited.
+ for {
+ select {
+ case <-clientDisconnect:
+ // If the client disconnects before job finishes, cancel the job.
+ // If job has already started, the job will finish and the results
+ // will be cached.
+ log.Printf("Client disconnected. Cancelling job.")
+ job.Cancel()
+ case result := <-resultChan:
+ if result.Success {
+ event.Debug(res, "Caching response")
+ cache.Add(requestBodyHash, cachedResponse{
+ Status: http.StatusOK,
+ Events: result.Events,
+ })
+ } else {
+ event.Debug(res, "Internal errors encountered, not caching response")
}
- sink.Write(e)
+ return
}
- if err != io.EOF && err != io.ErrClosedPipe {
- errorCallback(err)
- // Use goroutine to prevent deadlock on done channel.
- go stop()
- }
- done <- true
- }()
- return
-}
-
-// Initialize using newResponseEventSink.
-// An event.Sink which also saves all written Events regardless of successful
-// writes to the underlying ResponseWriter.
-type responseEventSink struct {
- // The mutex is used to ensure the same sequence of events being written to
- // both the JsonSink and the written Event array.
- mu sync.Mutex
- event.JsonSink
- written []event.Event
-}
-
-func newResponseEventSink(writer io.Writer, filterDebug bool) *responseEventSink {
- return &responseEventSink{
- JsonSink: *event.NewJsonSink(writer, filterDebug),
}
}
-
-func (r *responseEventSink) Write(events ...event.Event) error {
- r.mu.Lock()
- defer r.mu.Unlock()
- r.written = append(r.written, events...)
- return r.JsonSink.Write(events...)
-}
-
-// Returns and clears the history of Events written to the responseEventSink.
-func (r *responseEventSink) popWrittenEvents() []event.Event {
- r.mu.Lock()
- defer r.mu.Unlock()
- events := r.written
- r.written = nil
- return events
-}
-
-//////////////////////////////////////////
-// Miscellaneous helper functions
-
-func docker(args ...string) *exec.Cmd {
- return exec.Command("docker", args...)
-}
-
-// A channel which returns unique ids for the containers.
-var uniq = make(chan string)
-
-func init() {
- val := time.Now().UnixNano()
- go func() {
- for {
- uniq <- fmt.Sprintf("playground_%d", val)
- val++
- }
- }()
-}
diff --git a/go/src/playground/compilerd/jobqueue/jobqueue.go b/go/src/playground/compilerd/jobqueue/jobqueue.go
new file mode 100644
index 0000000..e9e3145
--- /dev/null
+++ b/go/src/playground/compilerd/jobqueue/jobqueue.go
@@ -0,0 +1,376 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package jobqueue implements a job queue for jobs.
+//
+// Usage:
+//
+// dispatcher := NewDispatcher(numWorkers, maxWaitingJobs)
+//
+// job := NewJob(...)
+// resultChan := dispatcher.Enqueue(job)
+// r :=<- resultChan // r will contain the result of running the job.
+//
+// dispatcher.Stop() // Waits for any in-progress jobs to finish, cancels any
+// // remaining jobs.
+//
+// Internally, the dispatcher has a channel of workers that represents a worker
+// queue, and a channel of jobs that represents a job queue. The dispatcher
+// reads a worker off the worker queue, and then reads a job off the job
+// queue, and runs that job on that worker. When the job finishes, the worker
+// is pushed back on to the worker queue.
+
+package jobqueue
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "os/exec"
+ "sync"
+ "time"
+
+ "playground/lib"
+ "playground/lib/event"
+)
+
+var (
+ // A channel which returns unique playground ids.
+ uniq = make(chan string)
+)
+
+func init() {
+ val := time.Now().UnixNano()
+ go func() {
+ for {
+ uniq <- fmt.Sprintf("playground_%d", val)
+ val++
+ }
+ }()
+}
+
+type job struct {
+ id string
+ body []byte
+ res *event.ResponseEventSink
+ resultChan chan result
+
+ maxSize int
+ maxTime time.Duration
+ useDocker bool
+ dockerMem int
+
+ mu sync.Mutex
+ cancelled bool
+}
+
+func NewJob(body []byte, res *event.ResponseEventSink, maxSize int, maxTime time.Duration, useDocker bool, dockerMem int) job {
+ return job{
+ id: <-uniq,
+ body: body,
+ res: res,
+ maxSize: maxSize,
+ maxTime: maxTime,
+ useDocker: useDocker,
+ dockerMem: dockerMem,
+
+ // resultChan has capacity 1 so that writing to the channel won't block
+ // if nobody ever reads the result.
+ resultChan: make(chan result, 1),
+ }
+}
+
+// Cancel will prevent the job from being run, if it has not already been
+// started by a worker.
+func (j *job) Cancel() {
+ j.mu.Lock()
+ defer j.mu.Unlock()
+ log.Printf("Cancelling job %v.\n", j.id)
+ j.cancelled = true
+}
+
+type Dispatcher struct {
+ jobQueue chan job
+
+ // A message sent on the stopped channel causes the dispatcher to stop
+ // assigning new jobs to workers.
+ stopped chan bool
+
+ // wg represents currently running workers. It is used during Stop to make
+ // sure that all workers have finished running their active jobs.
+ wg sync.WaitGroup
+}
+
+func NewDispatcher(workers int, jobQueueCap int) *Dispatcher {
+ d := &Dispatcher{
+ jobQueue: make(chan job, jobQueueCap),
+ stopped: make(chan bool),
+ }
+
+ d.start(workers)
+ return d
+}
+
+// start starts a given number of workers, then reads from the jobQueue and
+// assigns jobs to free workers.
+func (d *Dispatcher) start(num int) {
+ log.Printf("Dispatcher starting %d workers.\n", num)
+
+ // Workers are published on the workerQueue when they are free.
+ workerQueue := make(chan *worker, num)
+
+ for i := 0; i < num; i++ {
+ worker := newWorker(i)
+ workerQueue <- worker
+ }
+
+ d.wg.Add(1)
+
+ go func() {
+ Loop:
+ for {
+ // Wait for the next available worker.
+ select {
+ case <-d.stopped:
+ break Loop
+ case worker := <-workerQueue:
+ // Read the next job from the job queue.
+ select {
+ case <-d.stopped:
+ break Loop
+ case job := <-d.jobQueue:
+ job.mu.Lock()
+ cancelled := job.cancelled
+ job.mu.Unlock()
+ if cancelled {
+ log.Printf("Dispatcher encountered cancelled job %v, rejecting.", job.id)
+ job.resultChan <- result{
+ Success: false,
+ Events: nil,
+ }
+ } else {
+ log.Printf("Dispatching job %v to worker %v.\n", job.id, worker.id)
+ d.wg.Add(1)
+ go func() {
+ job.resultChan <- worker.run(job)
+ d.wg.Done()
+ workerQueue <- worker
+ }()
+ }
+ }
+ }
+ }
+
+ log.Printf("Dispatcher stopped.\n")
+
+ // Dispatcher stopped, treat all remaining jobs as cancelled.
+ for {
+ select {
+ case job := <-d.jobQueue:
+ log.Printf("Dispatcher is stopped, rejecting job %v.\n", job.id)
+ job.resultChan <- result{
+ Success: false,
+ Events: nil,
+ }
+ default:
+ log.Printf("Dispatcher job queue drained.\n")
+ d.wg.Done()
+ return
+ }
+ }
+ }()
+}
+
+// Stop stops the dispatcher from assigning any new jobs to workers. Jobs that
+// are currently running are allowed to continue. Other jobs are treated as
+// cancelled. Stop blocks until all jobs have finished.
+// TODO(nlacasse): Consider letting the dispatcher run all currently queued
+// jobs, rather than rejecting them. Or, put logic in the client to retry
+// cancelled jobs.
+func (d *Dispatcher) Stop() {
+ log.Printf("Stopping dispatcher.\n")
+ d.stopped <- true
+
+ // Wait for workers to finish their current jobs.
+ d.wg.Wait()
+}
+
+// Enqueue queues a job to be run be the next available worker. It returns a
+// channel on which the job's results will be published.
+func (d *Dispatcher) Enqueue(j job) (chan result, error) {
+ select {
+ case d.jobQueue <- j:
+ return j.resultChan, nil
+ default:
+ return nil, fmt.Errorf("Error queuing job. Job queue full.")
+ }
+}
+
+type result struct {
+ Success bool
+ Events []event.Event
+}
+
+type worker struct {
+ id int
+}
+
+func newWorker(id int) *worker {
+ return &worker{
+ id: id,
+ }
+}
+
+// run compiles and runs a job, caches the result, and returns the result on
+// the job's result channel.
+func (w *worker) run(j job) result {
+ event.Debug(j.res, "Preparing to run program")
+
+ memoryFlag := fmt.Sprintf("%dm", j.dockerMem)
+
+ var cmd *exec.Cmd
+ if j.useDocker {
+ // TODO(nlacasse,ivanpi): Limit the CPU resources used by this docker
+ // builder instance. The docker "cpu-shares" flag can only limit on
+ // docker process relative to another, so it's not useful for limiting
+ // the cpu resources of all build instances. The docker "cpuset" flag
+ // can pin the instance to a specific processor, so that might be of
+ // use.
+ cmd = docker("run", "-i",
+ "--name", j.id,
+ // Disable external networking.
+ "--net", "none",
+ // Limit instance memory.
+ "--memory", memoryFlag,
+ // Limit instance memory+swap combined.
+ // Setting to the same value as memory effectively disables swap.
+ "--memory-swap", memoryFlag,
+ "playground")
+ } else {
+ cmd = exec.Command("builder")
+ }
+ cmdKill := lib.DoOnce(func() {
+ event.Debug(j.res, "Killing program")
+ // The docker client can get in a state where stopping/killing/rm-ing
+ // the container will not kill the client. The opposite should work
+ // correctly (killing the docker client stops the container).
+ // If not, the docker rm call below will.
+ // Note, this wouldn't be sufficient if docker was called through sudo
+ // since sudo doesn't pass sigkill to child processes.
+ cmd.Process.Kill()
+ })
+
+ cmd.Stdin = bytes.NewReader(j.body)
+
+ // Builder will return all normal output as JSON Events on stdout, and will
+ // return unexpected errors on stderr.
+ // TODO(sadovsky): Security issue: what happens if the program output is huge?
+ // We can restrict memory use of the Docker container, but these buffers are
+ // outside Docker.
+ // TODO(ivanpi): Revisit above comment.
+ sizedOut := false
+ erroredOut := false
+
+ userLimitCallback := func() {
+ sizedOut = true
+ cmdKill()
+ }
+ systemLimitCallback := func() {
+ erroredOut = true
+ cmdKill()
+ }
+ userErrorCallback := func(err error) {
+ // A relay error can result from unparseable JSON caused by a builder bug
+ // or a malicious exploit inside Docker. Panicking could lead to a DoS.
+ log.Println(j.id, "builder stdout relay error:", err)
+ erroredOut = true
+ cmdKill()
+ }
+
+ outRelay, outStop := event.LimitedEventRelay(j.res, j.maxSize, userLimitCallback, userErrorCallback)
+ // Builder stdout should already contain a JSON Event stream.
+ cmd.Stdout = outRelay
+
+ // Any stderr is unexpected, most likely a bug (panic) in builder, but could
+ // also result from a malicious exploit inside Docker.
+ // It is quietly logged as long as it doesn't exceed maxSize.
+ errBuffer := new(bytes.Buffer)
+ cmd.Stderr = lib.NewLimitedWriter(errBuffer, j.maxSize, systemLimitCallback)
+
+ event.Debug(j.res, "Running program")
+
+ timeout := time.After(j.maxTime)
+ // User code execution is time limited in builder.
+ // This flag signals only unexpected timeouts. maxTime should be sufficient
+ // for end-to-end request processing by builder for worst-case user input.
+ // TODO(ivanpi): builder doesn't currently time compilation, so builder
+ // worst-case execution time is not clearly bounded.
+ timedOut := false
+
+ exit := make(chan error)
+ go func() { exit <- cmd.Run() }()
+
+ select {
+ case err := <-exit:
+ if err != nil && !sizedOut {
+ erroredOut = true
+ }
+ case <-timeout:
+ timedOut = true
+ cmdKill()
+ <-exit
+ }
+
+ // Close and wait for the output relay.
+ outStop()
+
+ event.Debug(j.res, "Program exited")
+
+ // Return the appropriate error message to the client.
+ if timedOut {
+ j.res.Write(event.New("", "stderr", "Internal timeout, please retry."))
+ } else if erroredOut {
+ j.res.Write(event.New("", "stderr", "Internal error, please retry."))
+ } else if sizedOut {
+ j.res.Write(event.New("", "stderr", "Program output too large, killed."))
+ }
+
+ // Log builder internal errors, if any.
+ // TODO(ivanpi): Prevent caching? Report to client if debug requested?
+ if errBuffer.Len() > 0 {
+ log.Println(j.id, "builder stderr:", errBuffer.String())
+ }
+
+ event.Debug(j.res, "Response finished")
+
+ // TODO(nlacasse): This "docker rm" can be slow (several seconds), and seems
+ // to block other Docker commands, thereby slowing down other concurrent
+ // requests. We should figure out how to make it not block other Docker
+ // commands. Setting GOMAXPROCS may or may not help.
+ // See: https://github.com/docker/docker/issues/6480
+ if j.useDocker {
+ go func() {
+ docker("rm", "-f", j.id).Run()
+ }()
+ }
+
+ // If we timed out or errored out, do not cache anything.
+ // TODO(sadovsky): This policy is helpful for development, but may not be wise
+ // for production. Revisit.
+ if !timedOut && !erroredOut {
+ return result{
+ Success: true,
+ Events: j.res.PopWrittenEvents(),
+ }
+ } else {
+ return result{
+ Success: false,
+ Events: nil,
+ }
+ }
+}
+
+func docker(args ...string) *exec.Cmd {
+ return exec.Command("docker", args...)
+}
diff --git a/go/src/playground/compilerd/main.go b/go/src/playground/compilerd/main.go
index 26a8403..3e01795 100644
--- a/go/src/playground/compilerd/main.go
+++ b/go/src/playground/compilerd/main.go
@@ -4,6 +4,9 @@
// HTTP server for saving, loading, and executing playground examples.
+// TODO(nlacasse,ivanpi): The word "compile" is no longer appropriate for what
+// this server does. Rename to something better.
+
package main
import (
@@ -20,10 +23,17 @@
"net/http"
"os"
"os/signal"
+ "runtime"
"syscall"
"time"
)
+func init() {
+ if os.Getenv("GOMAXPROCS") == "" {
+ runtime.GOMAXPROCS(runtime.NumCPU())
+ }
+}
+
var (
// This channel is closed when clean exit is triggered.
// No values are ever sent to it.
@@ -33,18 +43,19 @@
// compilerd exits cleanly on SIGTERM or after a random amount of time,
// between listenTimeout/2 and listenTimeout.
- listenTimeout = flag.Duration("listenTimeout", 60*time.Minute, "Maximum amount of time to listen for before exiting. A value of 0 disables the timeout.")
+ listenTimeout = flag.Duration("listen-timeout", 60*time.Minute, "Maximum amount of time to listen for before exiting. A value of 0 disables the timeout.")
- // Maximum request and output size. Same limit as imposed by Go tour.
+ // Maximum request and output size.
+ // 1<<16 is the same limit as imposed by Go tour.
// Note: The response includes error and status messages as well as output,
// so it can be larger (usually by a small constant, hard limited to
// 2*maxSize).
// maxSize should be large enough to fit all error and status messages
// written by compilerd to prevent reaching the hard limit.
- maxSize = 1 << 16
+ maxSize = flag.Int("max-size", 1<<16, "Maximum request and output size.")
- // Time to finish serving currently running requests before exiting cleanly.
- // No new requests are accepted during this time.
+ // Maximum time to finish serving currently running requests before exiting
+ // cleanly. No new requests are accepted during this time.
exitDelay = 30 * time.Second
)
@@ -59,18 +70,6 @@
return nil
}
-//////////////////////////////////////////
-// HTTP server
-
-func healthz(w http.ResponseWriter, r *http.Request) {
- select {
- case <-lameduck:
- w.WriteHeader(http.StatusInternalServerError)
- default:
- w.Write([]byte("ok"))
- }
-}
-
func main() {
flag.Parse()
@@ -93,13 +92,17 @@
log.Fatal(err)
}
+ startDispatcher()
+
http.HandleFunc("/compile", handlerCompile)
http.HandleFunc("/load", handlerLoad)
http.HandleFunc("/save", handlerSave)
- http.HandleFunc("/healthz", healthz)
+ http.HandleFunc("/healthz", handlerHealthz)
log.Printf("Serving %s\n", *address)
- http.ListenAndServe(*address, nil)
+ if err := http.ListenAndServe(*address, nil); err != nil {
+ panic(err)
+ }
}
func waitForExit(limit time.Duration) {
@@ -114,10 +117,10 @@
for {
select {
case <-deadline:
- log.Println("Deadline expired, exiting in", exitDelay)
+ log.Println("Deadline expired, exiting in at most", exitDelay)
break Loop
case <-term:
- log.Println("Got SIGTERM, exiting in", exitDelay)
+ log.Println("Got SIGTERM, exiting in at most", exitDelay)
break Loop
}
}
@@ -125,8 +128,20 @@
// Fail health checks so we stop getting requests.
close(lameduck)
- // Give running requests time to finish.
- time.Sleep(exitDelay)
+ go func() {
+ select {
+ case <-time.After(exitDelay):
+ fmt.Errorf("Dispatcher did not stop in %v, exiting.", exitDelay)
+ os.Exit(1)
+ }
+ }()
+
+ // Stop the dispatcher and wait for all in-progress jobs to finish.
+ dispatcher.Stop()
+
+ // Give the server some extra time to send any remaning responses that are
+ // queued to be sent.
+ time.Sleep(2 * time.Second)
os.Exit(0)
}
@@ -179,6 +194,15 @@
return buf.Bytes()
}
+func handlerHealthz(w http.ResponseWriter, r *http.Request) {
+ select {
+ case <-lameduck:
+ w.WriteHeader(http.StatusInternalServerError)
+ default:
+ w.Write([]byte("ok"))
+ }
+}
+
//////////////////////////////////////////
// Shared helper functions
diff --git a/go/src/playground/compilerd/storage.go b/go/src/playground/compilerd/storage.go
index ec3778c..6a7d960 100644
--- a/go/src/playground/compilerd/storage.go
+++ b/go/src/playground/compilerd/storage.go
@@ -203,11 +203,11 @@
// Check method and read POST body.
// Limit is set to maxSize+1 to allow distinguishing between exactly maxSize
// and larger than maxSize requests.
- requestBody := getPostBody(w, r, maxSize+1)
+ requestBody := getPostBody(w, r, *maxSize+1)
if requestBody == nil {
return
}
- if len(requestBody) > maxSize {
+ if len(requestBody) > *maxSize {
storageError(w, http.StatusBadRequest, "Program too large.")
return
}
diff --git a/go/src/playground/lib/event/response_event_sink.go b/go/src/playground/lib/event/response_event_sink.go
new file mode 100644
index 0000000..6e69972
--- /dev/null
+++ b/go/src/playground/lib/event/response_event_sink.go
@@ -0,0 +1,92 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package event
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "io"
+ "sync"
+
+ "playground/lib"
+)
+
+// Initialize using NewResponseEventSink.
+// An event.Sink which also saves all written Events regardless of successful
+// writes to the underlying ResponseWriter.
+type ResponseEventSink struct {
+ // The mutex is used to ensure the same sequence of events being written to
+ // both the JsonSink and the written Event array.
+ mu sync.Mutex
+ JsonSink
+ written []Event
+}
+
+func NewResponseEventSink(writer io.Writer, filterDebug bool) *ResponseEventSink {
+ return &ResponseEventSink{
+ JsonSink: *NewJsonSink(writer, filterDebug),
+ }
+}
+
+func (r *ResponseEventSink) Write(events ...Event) error {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.written = append(r.written, events...)
+ return r.JsonSink.Write(events...)
+}
+
+// Returns and clears the history of Events written to the ResponseEventSink.
+func (r *ResponseEventSink) PopWrittenEvents() []Event {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ events := r.written
+ r.written = nil
+ return events
+}
+
+// Each line written to the returned writer, up to limit bytes total, is parsed
+// into an Event and written to Sink.
+// If the limit is reached or an invalid line read, the corresponding callback
+// is called and the relay stopped.
+// The returned stop() function stops the relaying.
+func LimitedEventRelay(sink Sink, limit int, limitCallback func(), errorCallback func(err error)) (writer io.Writer, stop func()) {
+ pipeReader, pipeWriter := io.Pipe()
+ done := make(chan bool)
+ stop = lib.DoOnce(func() {
+ // Closing the pipe will cause the main relay loop to stop reading (EOF).
+ // Writes will fail with ErrClosedPipe.
+ pipeReader.Close()
+ pipeWriter.Close()
+ // Wait for the relay goroutine to finish.
+ <-done
+ })
+ writer = lib.NewLimitedWriter(pipeWriter, limit, func() {
+ limitCallback()
+ stop()
+ })
+ go func() {
+ bufr := bufio.NewReaderSize(pipeReader, limit)
+ var line []byte
+ var err error
+ // Relay complete lines (events) until EOF or a read error is encountered.
+ for line, err = bufr.ReadBytes('\n'); err == nil; line, err = bufr.ReadBytes('\n') {
+ var e Event
+ err = json.Unmarshal(line, &e)
+ if err != nil {
+ err = fmt.Errorf("failed unmarshalling event: %q", line)
+ break
+ }
+ sink.Write(e)
+ }
+ if err != io.EOF && err != io.ErrClosedPipe {
+ errorCallback(err)
+ // Use goroutine to prevent deadlock on done channel.
+ go stop()
+ }
+ done <- true
+ }()
+ return
+}