Merge "client/lib/shell.sh: VEYRON_* environment variables are now V23_*"
diff --git a/client/Makefile b/client/Makefile
index d86ce20..3e46715 100644
--- a/client/Makefile
+++ b/client/Makefile
@@ -21,7 +21,7 @@
 deploy-staging: all
 	git rev-parse --verify HEAD > public/version
 	gcloud config set project vanadium-staging
-	gsutil -m rsync -d -r public gs://staging.playground.v.io
+	gsutil -m rsync -d -r public gs://playground.staging.v.io
 
 public/bundle.js: browser/index.js $(js_files) node_modules
 	browserify --debug $< 1> $@
diff --git a/go/src/playground/Makefile b/go/src/playground/Makefile
index b4f2814..4fabbb8 100644
--- a/go/src/playground/Makefile
+++ b/go/src/playground/Makefile
@@ -19,7 +19,7 @@
 	compilerd \
 		--sqlconf=$< \
 		--setupdb=true \
-		--listenTimeout=0 \
+		--listen-timeout=0 \
 		--address=$(host):$(port) \
 		--use-docker=false
 
diff --git a/go/src/playground/README.md b/go/src/playground/README.md
index a49e371..0cab349 100644
--- a/go/src/playground/README.md
+++ b/go/src/playground/README.md
@@ -53,12 +53,12 @@
 
 Run the compiler binary:
 
-    $ $VANADIUM_ROOT/release/projects/playground/go/bin/compilerd --listenTimeout=0 --address=localhost:8181
+    $ $VANADIUM_ROOT/release/projects/playground/go/bin/compilerd --listen-timeout=0 --address=localhost:8181
 
 Or, run it without Docker (for faster iterations during development):
 
     $ cd $(mktemp -d "/tmp/XXXXXXXX")
-    $ PATH=$VANADIUM_ROOT/release/go/bin:$VANADIUM_ROOT/release/projects/playground/go/bin:$PATH compilerd --listenTimeout=0 --address=localhost:8181 --use-docker=false
+    $ PATH=$VANADIUM_ROOT/release/go/bin:$VANADIUM_ROOT/release/projects/playground/go/bin:$PATH compilerd --listen-timeout=0 --address=localhost:8181 --use-docker=false
 
 The server should now be running at http://localhost:8181 and responding to
 compile requests at http://localhost:8181/compile.
diff --git a/go/src/playground/compilerd/compile.go b/go/src/playground/compilerd/compile.go
index 7a164d0..217b15a 100644
--- a/go/src/playground/compilerd/compile.go
+++ b/go/src/playground/compilerd/compile.go
@@ -12,43 +12,60 @@
 package main
 
 import (
-	"bufio"
-	"bytes"
-	"encoding/json"
 	"flag"
-	"fmt"
-	"io"
 	"log"
 	"net/http"
-	"os/exec"
-	"sync"
 	"time"
 
 	"github.com/golang/groupcache/lru"
 
+	"playground/compilerd/jobqueue"
 	"playground/lib"
 	"playground/lib/event"
 )
 
 var (
+	// In-memory LRU cache of request/response bodies. Keys are sha256 sums of
+	// request bodies (32 bytes each), values are of type cachedResponse.
+	// NOTE(nlacasse): The cache size (10k) was chosen arbitrarily and should
+	// perhaps be optimized.
+	cache = lru.New(10000)
+
+	// dispatcher schedules jobs to be run on a fixed number of workers.
+	dispatcher *jobqueue.Dispatcher
+
 	useDocker = flag.Bool("use-docker", true, "Whether to use Docker to run builder; if false, we run the builder directly.")
 
+	// TODO(nlacasse): Experiment with different values for parallelism and
+	// dockerMemLimit once we have performance testing.
+	// There should be some memory left over for the system, http server, and
+	// docker daemon. The GCE n1-standard machines have 3.75GB of RAM, so the
+	// default value below should leave plenty of room.
+	parallelism    = flag.Int("parallelism", 5, "Maximum number of builds to run in parallel.")
+	dockerMemLimit = flag.Int("total-docker-memory", 3000, "Total memory limit for all Docker build instances in MB.")
+
 	// Arbitrary deadline (enough to compile, run, shutdown).
 	// TODO(sadovsky): For now this is set high to avoid spurious timeouts.
 	// Playground execution speed needs to be optimized.
-	maxTime = 10 * time.Second
+	maxTime = flag.Duration("max-time", 10*time.Second, "Maximum time for build to run.")
 
-	// In-memory LRU cache of request/response bodies. Keys are sha256 sums of
-	// request bodies (32 bytes each), values are of type CachedResponse.
-	// NOTE(nlacasse): The cache size (10k) was chosen arbitrarily and should
-	// perhaps be optimized.
-	cache = lru.New(10000)
+	// TODO(nlacasse): The default value of 100 was chosen arbitrarily and
+	// should be tuned.
+	jobQueueCap = flag.Int("job-queue-capacity", 100, "Maximum number of jobs to allow in the job queue. Attempting to add a new job will fail if the queue is full.")
 )
 
-//////////////////////////////////////////
-// HTTP request handler
+// cachedResponse is the type of values stored in the lru cache.
+type cachedResponse struct {
+	Status int
+	Events []event.Event
+}
 
-// POST request that compiles and runs the bundle and streams output to client.
+func startDispatcher() {
+	dispatcher = jobqueue.NewDispatcher(*parallelism, *jobQueueCap)
+}
+
+// POST request that returns cached results if any exist, otherwise schedules
+// the bundle to be run and caches the results.
 func handlerCompile(w http.ResponseWriter, r *http.Request) {
 	if !handleCORS(w, r) {
 		return
@@ -57,7 +74,7 @@
 	// Check method and read POST body.
 	// Limit is set to maxSize+1 to allow distinguishing between exactly maxSize
 	// and larger than maxSize requests.
-	requestBody := getPostBody(w, r, maxSize+1)
+	requestBody := getPostBody(w, r, *maxSize+1)
 	if requestBody == nil {
 		return
 	}
@@ -67,18 +84,18 @@
 	// sensitive information, so guarding with a query parameter is sufficient.
 	wantDebug := r.FormValue("debug") == "1"
 
-	openResponse := func(status int) *responseEventSink {
+	openResponse := func(status int) *event.ResponseEventSink {
 		w.Header().Add("Content-Type", "application/json")
 		// No Content-Length, using chunked encoding.
 		w.WriteHeader(status)
 		// The response is hard limited to 2*maxSize: maxSize for builder stdout,
 		// and another maxSize for compilerd error and status messages.
-		return newResponseEventSink(lib.NewLimitedWriter(w, 2*maxSize, lib.DoOnce(func() {
+		return event.NewResponseEventSink(lib.NewLimitedWriter(w, 2*(*maxSize), lib.DoOnce(func() {
 			log.Println("Hard response size limit reached.")
 		})), !wantDebug)
 	}
 
-	if len(requestBody) > maxSize {
+	if len(requestBody) > *maxSize {
 		res := openResponse(http.StatusBadRequest)
 		res.Write(event.New("", "stderr", "Program too large."))
 		return
@@ -89,251 +106,59 @@
 	// NOTE(sadovsky): In the client we may shift timestamps (based on current
 	// time) and introduce a fake delay.
 	requestBodyHash := rawHash(requestBody)
-	if cachedResponse, ok := cache.Get(requestBodyHash); ok {
-		if cachedResponseStruct, ok := cachedResponse.(CachedResponse); ok {
+	if cr, ok := cache.Get(requestBodyHash); ok {
+		if cachedResponseStruct, ok := cr.(cachedResponse); ok {
 			res := openResponse(cachedResponseStruct.Status)
 			event.Debug(res, "Sending cached response")
 			res.Write(cachedResponseStruct.Events...)
 			return
 		} else {
-			log.Panicf("Invalid cached response: %v\n", cachedResponse)
+			log.Panicf("Invalid cached response: %v\n", cr)
 		}
 	}
 
 	res := openResponse(http.StatusOK)
 
-	id := <-uniq
-
-	event.Debug(res, "Preparing to run program")
-
-	// TODO(sadovsky): Set runtime constraints on CPU and memory usage.
-	// http://docs.docker.com/reference/run/#runtime-constraints-on-cpu-and-memory
-	var cmd *exec.Cmd
-	if *useDocker {
-		cmd = docker("run", "-i", "--name", id, "playground")
-	} else {
-		cmd = exec.Command("builder")
-	}
-	cmdKill := lib.DoOnce(func() {
-		event.Debug(res, "Killing program")
-		// The docker client can get in a state where stopping/killing/rm-ing
-		// the container will not kill the client. The opposite should work
-		// correctly (killing the docker client stops the container).
-		// If not, the docker rm call below will.
-		// Note, this wouldn't be sufficient if docker was called through sudo
-		// since sudo doesn't pass sigkill to child processes.
-		cmd.Process.Kill()
-	})
-
-	cmd.Stdin = bytes.NewReader(requestBody)
-
-	// Builder will return all normal output as JSON Events on stdout, and will
-	// return unexpected errors on stderr.
-	// TODO(sadovsky): Security issue: what happens if the program output is huge?
-	// We can restrict memory use of the Docker container, but these buffers are
-	// outside Docker.
-	// TODO(ivanpi): Revisit above comment.
-	sizedOut := false
-	erroredOut := false
-
-	userLimitCallback := func() {
-		sizedOut = true
-		cmdKill()
-	}
-	systemLimitCallback := func() {
-		erroredOut = true
-		cmdKill()
-	}
-	userErrorCallback := func(err error) {
-		// A relay error can result from unparseable JSON caused by a builder bug
-		// or a malicious exploit inside Docker. Panicking could lead to a DoS.
-		log.Println(id, "builder stdout relay error:", err)
-		erroredOut = true
-		cmdKill()
+	// Calculate memory limit for the docker instance running this job.
+	dockerMem := *dockerMemLimit
+	if *parallelism > 0 {
+		dockerMem /= *parallelism
 	}
 
-	outRelay, outStop := limitedEventRelay(res, maxSize, userLimitCallback, userErrorCallback)
-	// Builder stdout should already contain a JSON Event stream.
-	cmd.Stdout = outRelay
-
-	// Any stderr is unexpected, most likely a bug (panic) in builder, but could
-	// also result from a malicious exploit inside Docker.
-	// It is quietly logged as long as it doesn't exceed maxSize.
-	errBuffer := new(bytes.Buffer)
-	cmd.Stderr = lib.NewLimitedWriter(errBuffer, maxSize, systemLimitCallback)
-
-	event.Debug(res, "Running program")
-
-	timeout := time.After(maxTime)
-	// User code execution is time limited in builder.
-	// This flag signals only unexpected timeouts. maxTime should be sufficient
-	// for end-to-end request processing by builder for worst-case user input.
-	// TODO(ivanpi): builder doesn't currently time compilation, so builder
-	// worst-case execution time is not clearly bounded.
-	timedOut := false
-
-	exit := make(chan error)
-	go func() { exit <- cmd.Run() }()
-
-	select {
-	case err := <-exit:
-		if err != nil && !sizedOut {
-			erroredOut = true
-		}
-	case <-timeout:
-		timedOut = true
-		cmdKill()
-		<-exit
+	// Create a new compile job and queue it.
+	job := jobqueue.NewJob(requestBody, res, *maxSize, *maxTime, *useDocker, dockerMem)
+	resultChan, err := dispatcher.Enqueue(job)
+	if err != nil {
+		// TODO(nlacasse): This should send a StatusServiceUnavailable, not a StatusOK.
+		res.Write(event.New("", "stderr", "Service busy. Please try again later."))
+		return
 	}
 
-	// Close and wait for the output relay.
-	outStop()
+	clientDisconnect := w.(http.CloseNotifier).CloseNotify()
 
-	event.Debug(res, "Program exited")
-
-	// Return the appropriate error message to the client.
-	if timedOut {
-		res.Write(event.New("", "stderr", "Internal timeout, please retry."))
-	} else if erroredOut {
-		res.Write(event.New("", "stderr", "Internal error, please retry."))
-	} else if sizedOut {
-		res.Write(event.New("", "stderr", "Program output too large, killed."))
-	}
-
-	// Log builder internal errors, if any.
-	// TODO(ivanpi): Prevent caching? Report to client if debug requested?
-	if errBuffer.Len() > 0 {
-		log.Println(id, "builder stderr:", errBuffer.String())
-	}
-
-	event.Debug(res, "Response finished")
-
-	// If we timed out or errored out, do not cache anything.
-	// TODO(sadovsky): This policy is helpful for development, but may not be wise
-	// for production. Revisit.
-	if !timedOut && !erroredOut {
-		cache.Add(requestBodyHash, CachedResponse{
-			Status: http.StatusOK,
-			Events: res.popWrittenEvents(),
-		})
-		event.Debug(res, "Caching response")
-	} else {
-		event.Debug(res, "Internal errors encountered, not caching response")
-	}
-
-	// TODO(nlacasse): This "docker rm" can be slow (several seconds), and seems
-	// to block other Docker commands, thereby slowing down other concurrent
-	// requests. We should figure out how to make it not block other Docker
-	// commands. Setting GOMAXPROCS may or may not help.
-	// See: https://github.com/docker/docker/issues/6480
-	if *useDocker {
-		go func() {
-			docker("rm", "-f", id).Run()
-		}()
-	}
-}
-
-//////////////////////////////////////////
-// Event write and cache support
-
-type CachedResponse struct {
-	Status int
-	Events []event.Event
-}
-
-// Each line written to the returned writer, up to limit bytes total, is parsed
-// into an Event and written to Sink.
-// If the limit is reached or an invalid line read, the corresponding callback
-// is called and the relay stopped.
-// The returned stop() function stops the relaying.
-func limitedEventRelay(sink event.Sink, limit int, limitCallback func(), errorCallback func(err error)) (writer io.Writer, stop func()) {
-	pipeReader, pipeWriter := io.Pipe()
-	done := make(chan bool)
-	stop = lib.DoOnce(func() {
-		// Closing the pipe will cause the main relay loop to stop reading (EOF).
-		// Writes will fail with ErrClosedPipe.
-		pipeReader.Close()
-		pipeWriter.Close()
-		// Wait for the relay goroutine to finish.
-		<-done
-	})
-	writer = lib.NewLimitedWriter(pipeWriter, limit, func() {
-		limitCallback()
-		stop()
-	})
-	go func() {
-		bufr := bufio.NewReaderSize(pipeReader, limit)
-		var line []byte
-		var err error
-		// Relay complete lines (events) until EOF or a read error is encountered.
-		for line, err = bufr.ReadBytes('\n'); err == nil; line, err = bufr.ReadBytes('\n') {
-			var e event.Event
-			err = json.Unmarshal(line, &e)
-			if err != nil {
-				err = fmt.Errorf("failed unmarshalling event: %q", line)
-				break
+	// Wait for job to finish and cache results if job succeeded.
+	// We do this in a for loop because we always need to wait for the result
+	// before closing the http handler, since Go panics when writing to the
+	// response after the handler has exited.
+	for {
+		select {
+		case <-clientDisconnect:
+			// If the client disconnects before job finishes, cancel the job.
+			// If job has already started, the job will finish and the results
+			// will be cached.
+			log.Printf("Client disconnected. Cancelling job.")
+			job.Cancel()
+		case result := <-resultChan:
+			if result.Success {
+				event.Debug(res, "Caching response")
+				cache.Add(requestBodyHash, cachedResponse{
+					Status: http.StatusOK,
+					Events: result.Events,
+				})
+			} else {
+				event.Debug(res, "Internal errors encountered, not caching response")
 			}
-			sink.Write(e)
+			return
 		}
-		if err != io.EOF && err != io.ErrClosedPipe {
-			errorCallback(err)
-			// Use goroutine to prevent deadlock on done channel.
-			go stop()
-		}
-		done <- true
-	}()
-	return
-}
-
-// Initialize using newResponseEventSink.
-// An event.Sink which also saves all written Events regardless of successful
-// writes to the underlying ResponseWriter.
-type responseEventSink struct {
-	// The mutex is used to ensure the same sequence of events being written to
-	// both the JsonSink and the written Event array.
-	mu sync.Mutex
-	event.JsonSink
-	written []event.Event
-}
-
-func newResponseEventSink(writer io.Writer, filterDebug bool) *responseEventSink {
-	return &responseEventSink{
-		JsonSink: *event.NewJsonSink(writer, filterDebug),
 	}
 }
-
-func (r *responseEventSink) Write(events ...event.Event) error {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	r.written = append(r.written, events...)
-	return r.JsonSink.Write(events...)
-}
-
-// Returns and clears the history of Events written to the responseEventSink.
-func (r *responseEventSink) popWrittenEvents() []event.Event {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	events := r.written
-	r.written = nil
-	return events
-}
-
-//////////////////////////////////////////
-// Miscellaneous helper functions
-
-func docker(args ...string) *exec.Cmd {
-	return exec.Command("docker", args...)
-}
-
-// A channel which returns unique ids for the containers.
-var uniq = make(chan string)
-
-func init() {
-	val := time.Now().UnixNano()
-	go func() {
-		for {
-			uniq <- fmt.Sprintf("playground_%d", val)
-			val++
-		}
-	}()
-}
diff --git a/go/src/playground/compilerd/jobqueue/jobqueue.go b/go/src/playground/compilerd/jobqueue/jobqueue.go
new file mode 100644
index 0000000..e9e3145
--- /dev/null
+++ b/go/src/playground/compilerd/jobqueue/jobqueue.go
@@ -0,0 +1,376 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package jobqueue implements a job queue for jobs.
+//
+// Usage:
+//
+//   dispatcher := NewDispatcher(numWorkers, maxWaitingJobs)
+//
+//   job := NewJob(...)
+//   resultChan := dispatcher.Enqueue(job)
+//   r :=<- resultChan // r will contain the result of running the job.
+//
+//   dispatcher.Stop() // Waits for any in-progress jobs to finish, cancels any
+//                     // remaining jobs.
+//
+// Internally, the dispatcher has a channel of workers that represents a worker
+// queue, and a channel of jobs that represents a job queue.  The dispatcher
+// reads a  worker off the worker queue, and then reads a job off the job
+// queue, and runs that job on that worker. When the job finishes, the worker
+// is pushed back on to the worker queue.
+
+package jobqueue
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"os/exec"
+	"sync"
+	"time"
+
+	"playground/lib"
+	"playground/lib/event"
+)
+
+var (
+	// A channel which returns unique playground ids.
+	uniq = make(chan string)
+)
+
+func init() {
+	val := time.Now().UnixNano()
+	go func() {
+		for {
+			uniq <- fmt.Sprintf("playground_%d", val)
+			val++
+		}
+	}()
+}
+
+type job struct {
+	id         string
+	body       []byte
+	res        *event.ResponseEventSink
+	resultChan chan result
+
+	maxSize   int
+	maxTime   time.Duration
+	useDocker bool
+	dockerMem int
+
+	mu        sync.Mutex
+	cancelled bool
+}
+
+func NewJob(body []byte, res *event.ResponseEventSink, maxSize int, maxTime time.Duration, useDocker bool, dockerMem int) job {
+	return job{
+		id:        <-uniq,
+		body:      body,
+		res:       res,
+		maxSize:   maxSize,
+		maxTime:   maxTime,
+		useDocker: useDocker,
+		dockerMem: dockerMem,
+
+		// resultChan has capacity 1 so that writing to the channel won't block
+		// if nobody ever reads the result.
+		resultChan: make(chan result, 1),
+	}
+}
+
+// Cancel will prevent the job from being run, if it has not already been
+// started by a worker.
+func (j *job) Cancel() {
+	j.mu.Lock()
+	defer j.mu.Unlock()
+	log.Printf("Cancelling job %v.\n", j.id)
+	j.cancelled = true
+}
+
+type Dispatcher struct {
+	jobQueue chan job
+
+	// A message sent on the stopped channel causes the dispatcher to stop
+	// assigning new jobs to workers.
+	stopped chan bool
+
+	// wg represents currently running workers. It is used during Stop to make
+	// sure that all workers have finished running their active jobs.
+	wg sync.WaitGroup
+}
+
+func NewDispatcher(workers int, jobQueueCap int) *Dispatcher {
+	d := &Dispatcher{
+		jobQueue: make(chan job, jobQueueCap),
+		stopped:  make(chan bool),
+	}
+
+	d.start(workers)
+	return d
+}
+
+// start starts a given number of workers, then reads from the jobQueue and
+// assigns jobs to free workers.
+func (d *Dispatcher) start(num int) {
+	log.Printf("Dispatcher starting %d workers.\n", num)
+
+	// Workers are published on the workerQueue when they are free.
+	workerQueue := make(chan *worker, num)
+
+	for i := 0; i < num; i++ {
+		worker := newWorker(i)
+		workerQueue <- worker
+	}
+
+	d.wg.Add(1)
+
+	go func() {
+	Loop:
+		for {
+			// Wait for the next available worker.
+			select {
+			case <-d.stopped:
+				break Loop
+			case worker := <-workerQueue:
+				// Read the next job from the job queue.
+				select {
+				case <-d.stopped:
+					break Loop
+				case job := <-d.jobQueue:
+					job.mu.Lock()
+					cancelled := job.cancelled
+					job.mu.Unlock()
+					if cancelled {
+						log.Printf("Dispatcher encountered cancelled job %v, rejecting.", job.id)
+						job.resultChan <- result{
+							Success: false,
+							Events:  nil,
+						}
+					} else {
+						log.Printf("Dispatching job %v to worker %v.\n", job.id, worker.id)
+						d.wg.Add(1)
+						go func() {
+							job.resultChan <- worker.run(job)
+							d.wg.Done()
+							workerQueue <- worker
+						}()
+					}
+				}
+			}
+		}
+
+		log.Printf("Dispatcher stopped.\n")
+
+		// Dispatcher stopped, treat all remaining jobs as cancelled.
+		for {
+			select {
+			case job := <-d.jobQueue:
+				log.Printf("Dispatcher is stopped, rejecting job %v.\n", job.id)
+				job.resultChan <- result{
+					Success: false,
+					Events:  nil,
+				}
+			default:
+				log.Printf("Dispatcher job queue drained.\n")
+				d.wg.Done()
+				return
+			}
+		}
+	}()
+}
+
+// Stop stops the dispatcher from assigning any new jobs to workers. Jobs that
+// are currently running are allowed to continue. Other jobs are treated as
+// cancelled. Stop blocks until all jobs have finished.
+// TODO(nlacasse): Consider letting the dispatcher run all currently queued
+// jobs, rather than rejecting them.  Or, put logic in the client to retry
+// cancelled jobs.
+func (d *Dispatcher) Stop() {
+	log.Printf("Stopping dispatcher.\n")
+	d.stopped <- true
+
+	// Wait for workers to finish their current jobs.
+	d.wg.Wait()
+}
+
+// Enqueue queues a job to be run be the next available worker. It returns a
+// channel on which the job's results will be published.
+func (d *Dispatcher) Enqueue(j job) (chan result, error) {
+	select {
+	case d.jobQueue <- j:
+		return j.resultChan, nil
+	default:
+		return nil, fmt.Errorf("Error queuing job. Job queue full.")
+	}
+}
+
+type result struct {
+	Success bool
+	Events  []event.Event
+}
+
+type worker struct {
+	id int
+}
+
+func newWorker(id int) *worker {
+	return &worker{
+		id: id,
+	}
+}
+
+// run compiles and runs a job, caches the result, and returns the result on
+// the job's result channel.
+func (w *worker) run(j job) result {
+	event.Debug(j.res, "Preparing to run program")
+
+	memoryFlag := fmt.Sprintf("%dm", j.dockerMem)
+
+	var cmd *exec.Cmd
+	if j.useDocker {
+		// TODO(nlacasse,ivanpi): Limit the CPU resources used by this docker
+		// builder instance.  The docker "cpu-shares" flag can only limit on
+		// docker process relative to another, so it's not useful for limiting
+		// the cpu resources of all build instances. The docker "cpuset" flag
+		// can pin the instance to a specific processor, so that might be of
+		// use.
+		cmd = docker("run", "-i",
+			"--name", j.id,
+			// Disable external networking.
+			"--net", "none",
+			// Limit instance memory.
+			"--memory", memoryFlag,
+			// Limit instance memory+swap combined.
+			// Setting to the same value as memory effectively disables swap.
+			"--memory-swap", memoryFlag,
+			"playground")
+	} else {
+		cmd = exec.Command("builder")
+	}
+	cmdKill := lib.DoOnce(func() {
+		event.Debug(j.res, "Killing program")
+		// The docker client can get in a state where stopping/killing/rm-ing
+		// the container will not kill the client. The opposite should work
+		// correctly (killing the docker client stops the container).
+		// If not, the docker rm call below will.
+		// Note, this wouldn't be sufficient if docker was called through sudo
+		// since sudo doesn't pass sigkill to child processes.
+		cmd.Process.Kill()
+	})
+
+	cmd.Stdin = bytes.NewReader(j.body)
+
+	// Builder will return all normal output as JSON Events on stdout, and will
+	// return unexpected errors on stderr.
+	// TODO(sadovsky): Security issue: what happens if the program output is huge?
+	// We can restrict memory use of the Docker container, but these buffers are
+	// outside Docker.
+	// TODO(ivanpi): Revisit above comment.
+	sizedOut := false
+	erroredOut := false
+
+	userLimitCallback := func() {
+		sizedOut = true
+		cmdKill()
+	}
+	systemLimitCallback := func() {
+		erroredOut = true
+		cmdKill()
+	}
+	userErrorCallback := func(err error) {
+		// A relay error can result from unparseable JSON caused by a builder bug
+		// or a malicious exploit inside Docker. Panicking could lead to a DoS.
+		log.Println(j.id, "builder stdout relay error:", err)
+		erroredOut = true
+		cmdKill()
+	}
+
+	outRelay, outStop := event.LimitedEventRelay(j.res, j.maxSize, userLimitCallback, userErrorCallback)
+	// Builder stdout should already contain a JSON Event stream.
+	cmd.Stdout = outRelay
+
+	// Any stderr is unexpected, most likely a bug (panic) in builder, but could
+	// also result from a malicious exploit inside Docker.
+	// It is quietly logged as long as it doesn't exceed maxSize.
+	errBuffer := new(bytes.Buffer)
+	cmd.Stderr = lib.NewLimitedWriter(errBuffer, j.maxSize, systemLimitCallback)
+
+	event.Debug(j.res, "Running program")
+
+	timeout := time.After(j.maxTime)
+	// User code execution is time limited in builder.
+	// This flag signals only unexpected timeouts. maxTime should be sufficient
+	// for end-to-end request processing by builder for worst-case user input.
+	// TODO(ivanpi): builder doesn't currently time compilation, so builder
+	// worst-case execution time is not clearly bounded.
+	timedOut := false
+
+	exit := make(chan error)
+	go func() { exit <- cmd.Run() }()
+
+	select {
+	case err := <-exit:
+		if err != nil && !sizedOut {
+			erroredOut = true
+		}
+	case <-timeout:
+		timedOut = true
+		cmdKill()
+		<-exit
+	}
+
+	// Close and wait for the output relay.
+	outStop()
+
+	event.Debug(j.res, "Program exited")
+
+	// Return the appropriate error message to the client.
+	if timedOut {
+		j.res.Write(event.New("", "stderr", "Internal timeout, please retry."))
+	} else if erroredOut {
+		j.res.Write(event.New("", "stderr", "Internal error, please retry."))
+	} else if sizedOut {
+		j.res.Write(event.New("", "stderr", "Program output too large, killed."))
+	}
+
+	// Log builder internal errors, if any.
+	// TODO(ivanpi): Prevent caching? Report to client if debug requested?
+	if errBuffer.Len() > 0 {
+		log.Println(j.id, "builder stderr:", errBuffer.String())
+	}
+
+	event.Debug(j.res, "Response finished")
+
+	// TODO(nlacasse): This "docker rm" can be slow (several seconds), and seems
+	// to block other Docker commands, thereby slowing down other concurrent
+	// requests. We should figure out how to make it not block other Docker
+	// commands. Setting GOMAXPROCS may or may not help.
+	// See: https://github.com/docker/docker/issues/6480
+	if j.useDocker {
+		go func() {
+			docker("rm", "-f", j.id).Run()
+		}()
+	}
+
+	// If we timed out or errored out, do not cache anything.
+	// TODO(sadovsky): This policy is helpful for development, but may not be wise
+	// for production. Revisit.
+	if !timedOut && !erroredOut {
+		return result{
+			Success: true,
+			Events:  j.res.PopWrittenEvents(),
+		}
+	} else {
+		return result{
+			Success: false,
+			Events:  nil,
+		}
+	}
+}
+
+func docker(args ...string) *exec.Cmd {
+	return exec.Command("docker", args...)
+}
diff --git a/go/src/playground/compilerd/main.go b/go/src/playground/compilerd/main.go
index 26a8403..3e01795 100644
--- a/go/src/playground/compilerd/main.go
+++ b/go/src/playground/compilerd/main.go
@@ -4,6 +4,9 @@
 
 // HTTP server for saving, loading, and executing playground examples.
 
+// TODO(nlacasse,ivanpi): The word "compile" is no longer appropriate for what
+// this server does. Rename to something better.
+
 package main
 
 import (
@@ -20,10 +23,17 @@
 	"net/http"
 	"os"
 	"os/signal"
+	"runtime"
 	"syscall"
 	"time"
 )
 
+func init() {
+	if os.Getenv("GOMAXPROCS") == "" {
+		runtime.GOMAXPROCS(runtime.NumCPU())
+	}
+}
+
 var (
 	// This channel is closed when clean exit is triggered.
 	// No values are ever sent to it.
@@ -33,18 +43,19 @@
 
 	// compilerd exits cleanly on SIGTERM or after a random amount of time,
 	// between listenTimeout/2 and listenTimeout.
-	listenTimeout = flag.Duration("listenTimeout", 60*time.Minute, "Maximum amount of time to listen for before exiting. A value of 0 disables the timeout.")
+	listenTimeout = flag.Duration("listen-timeout", 60*time.Minute, "Maximum amount of time to listen for before exiting. A value of 0 disables the timeout.")
 
-	// Maximum request and output size. Same limit as imposed by Go tour.
+	// Maximum request and output size.
+	// 1<<16 is the same limit as imposed by Go tour.
 	// Note: The response includes error and status messages as well as output,
 	// so it can be larger (usually by a small constant, hard limited to
 	// 2*maxSize).
 	// maxSize should be large enough to fit all error and status messages
 	// written by compilerd to prevent reaching the hard limit.
-	maxSize = 1 << 16
+	maxSize = flag.Int("max-size", 1<<16, "Maximum request and output size.")
 
-	// Time to finish serving currently running requests before exiting cleanly.
-	// No new requests are accepted during this time.
+	// Maximum time to finish serving currently running requests before exiting
+	// cleanly. No new requests are accepted during this time.
 	exitDelay = 30 * time.Second
 )
 
@@ -59,18 +70,6 @@
 	return nil
 }
 
-//////////////////////////////////////////
-// HTTP server
-
-func healthz(w http.ResponseWriter, r *http.Request) {
-	select {
-	case <-lameduck:
-		w.WriteHeader(http.StatusInternalServerError)
-	default:
-		w.Write([]byte("ok"))
-	}
-}
-
 func main() {
 	flag.Parse()
 
@@ -93,13 +92,17 @@
 		log.Fatal(err)
 	}
 
+	startDispatcher()
+
 	http.HandleFunc("/compile", handlerCompile)
 	http.HandleFunc("/load", handlerLoad)
 	http.HandleFunc("/save", handlerSave)
-	http.HandleFunc("/healthz", healthz)
+	http.HandleFunc("/healthz", handlerHealthz)
 
 	log.Printf("Serving %s\n", *address)
-	http.ListenAndServe(*address, nil)
+	if err := http.ListenAndServe(*address, nil); err != nil {
+		panic(err)
+	}
 }
 
 func waitForExit(limit time.Duration) {
@@ -114,10 +117,10 @@
 	for {
 		select {
 		case <-deadline:
-			log.Println("Deadline expired, exiting in", exitDelay)
+			log.Println("Deadline expired, exiting in at most", exitDelay)
 			break Loop
 		case <-term:
-			log.Println("Got SIGTERM, exiting in", exitDelay)
+			log.Println("Got SIGTERM, exiting in at most", exitDelay)
 			break Loop
 		}
 	}
@@ -125,8 +128,20 @@
 	// Fail health checks so we stop getting requests.
 	close(lameduck)
 
-	// Give running requests time to finish.
-	time.Sleep(exitDelay)
+	go func() {
+		select {
+		case <-time.After(exitDelay):
+			fmt.Errorf("Dispatcher did not stop in %v, exiting.", exitDelay)
+			os.Exit(1)
+		}
+	}()
+
+	// Stop the dispatcher and wait for all in-progress jobs to finish.
+	dispatcher.Stop()
+
+	// Give the server some extra time to send any remaning responses that are
+	// queued to be sent.
+	time.Sleep(2 * time.Second)
 
 	os.Exit(0)
 }
@@ -179,6 +194,15 @@
 	return buf.Bytes()
 }
 
+func handlerHealthz(w http.ResponseWriter, r *http.Request) {
+	select {
+	case <-lameduck:
+		w.WriteHeader(http.StatusInternalServerError)
+	default:
+		w.Write([]byte("ok"))
+	}
+}
+
 //////////////////////////////////////////
 // Shared helper functions
 
diff --git a/go/src/playground/compilerd/storage.go b/go/src/playground/compilerd/storage.go
index ec3778c..6a7d960 100644
--- a/go/src/playground/compilerd/storage.go
+++ b/go/src/playground/compilerd/storage.go
@@ -203,11 +203,11 @@
 	// Check method and read POST body.
 	// Limit is set to maxSize+1 to allow distinguishing between exactly maxSize
 	// and larger than maxSize requests.
-	requestBody := getPostBody(w, r, maxSize+1)
+	requestBody := getPostBody(w, r, *maxSize+1)
 	if requestBody == nil {
 		return
 	}
-	if len(requestBody) > maxSize {
+	if len(requestBody) > *maxSize {
 		storageError(w, http.StatusBadRequest, "Program too large.")
 		return
 	}
diff --git a/go/src/playground/lib/event/response_event_sink.go b/go/src/playground/lib/event/response_event_sink.go
new file mode 100644
index 0000000..6e69972
--- /dev/null
+++ b/go/src/playground/lib/event/response_event_sink.go
@@ -0,0 +1,92 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package event
+
+import (
+	"bufio"
+	"encoding/json"
+	"fmt"
+	"io"
+	"sync"
+
+	"playground/lib"
+)
+
+// Initialize using NewResponseEventSink.
+// An event.Sink which also saves all written Events regardless of successful
+// writes to the underlying ResponseWriter.
+type ResponseEventSink struct {
+	// The mutex is used to ensure the same sequence of events being written to
+	// both the JsonSink and the written Event array.
+	mu sync.Mutex
+	JsonSink
+	written []Event
+}
+
+func NewResponseEventSink(writer io.Writer, filterDebug bool) *ResponseEventSink {
+	return &ResponseEventSink{
+		JsonSink: *NewJsonSink(writer, filterDebug),
+	}
+}
+
+func (r *ResponseEventSink) Write(events ...Event) error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	r.written = append(r.written, events...)
+	return r.JsonSink.Write(events...)
+}
+
+// Returns and clears the history of Events written to the ResponseEventSink.
+func (r *ResponseEventSink) PopWrittenEvents() []Event {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+	events := r.written
+	r.written = nil
+	return events
+}
+
+// Each line written to the returned writer, up to limit bytes total, is parsed
+// into an Event and written to Sink.
+// If the limit is reached or an invalid line read, the corresponding callback
+// is called and the relay stopped.
+// The returned stop() function stops the relaying.
+func LimitedEventRelay(sink Sink, limit int, limitCallback func(), errorCallback func(err error)) (writer io.Writer, stop func()) {
+	pipeReader, pipeWriter := io.Pipe()
+	done := make(chan bool)
+	stop = lib.DoOnce(func() {
+		// Closing the pipe will cause the main relay loop to stop reading (EOF).
+		// Writes will fail with ErrClosedPipe.
+		pipeReader.Close()
+		pipeWriter.Close()
+		// Wait for the relay goroutine to finish.
+		<-done
+	})
+	writer = lib.NewLimitedWriter(pipeWriter, limit, func() {
+		limitCallback()
+		stop()
+	})
+	go func() {
+		bufr := bufio.NewReaderSize(pipeReader, limit)
+		var line []byte
+		var err error
+		// Relay complete lines (events) until EOF or a read error is encountered.
+		for line, err = bufr.ReadBytes('\n'); err == nil; line, err = bufr.ReadBytes('\n') {
+			var e Event
+			err = json.Unmarshal(line, &e)
+			if err != nil {
+				err = fmt.Errorf("failed unmarshalling event: %q", line)
+				break
+			}
+			sink.Write(e)
+		}
+		if err != io.EOF && err != io.ErrClosedPipe {
+			errorCallback(err)
+			// Use goroutine to prevent deadlock on done channel.
+			go stop()
+		}
+		done <- true
+	}()
+	return
+}