blob: a8b494de24e7f2e752bd158cf3acf69040f38a4b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package concurrency
import (
"fmt"
"sort"
)
// execution represents an execution of the test.
type execution struct {
// strategy describes the initial sequence of scheduling decisions
// to make.
strategy []TID
// nsteps records the number of scheduling decision made.
nsteps int
// nthreads records the number of threads in the system.
nthreads int
// nrequests records the number of currently pending requests.
nrequests int
// requests is a channel on which scheduling requests are received.
requests chan request
// done is a channel that the request handlers can use to wake up
// the main scheduling loop.
done chan struct{}
// nextTID is a function that can be used to generate unique thread
// identifiers.
nextTID func() TID
// activeTID records the identifier of the currently active thread.
activeTID TID
// ctx stores the abstract state of resources used by the
// execution.
ctx *context
// threads records the abstract state of threads active in the
// execution.
threads map[TID]*thread
}
// newExecution is the execution factory.
func newExecution(strategy []TID) *execution {
execution := &execution{
strategy: strategy,
nthreads: 1,
requests: make(chan request),
nextTID: TIDGenerator(),
ctx: newContext(),
threads: make(map[TID]*thread),
}
clock := newClock()
clock[0] = 0
execution.threads[0] = newThread(0, clock)
return execution
}
// Run executes the body of the test, exploring a sequence of
// scheduling decisions, and returns a vector of the scheduling
// decisions it made as well as the alternative scheduling decisions
// it could have made instead.
func (e *execution) Run(testBody func()) []*choice {
go testBody()
choices := make([]*choice, 0)
// Keep scheduling requests until there are threads left in the
// system.
for e.nthreads != 0 {
// Keep receiving scheduling requests until all threads are
// blocked on a decision.
for e.nrequests != e.nthreads {
request, ok := <-e.requests
if !ok {
panic("Command channel closed unexpectedly.")
}
e.nrequests++
request.process(e)
}
choice := e.generateChoice()
choices = append(choices, choice)
e.activeTID = choice.next
e.schedule(choice.next)
}
return choices
}
// findThread uses the given thread identifier to find a thread among
// the known threads.
func (e *execution) findThread(tid TID) *thread {
thread, ok := e.threads[tid]
if !ok {
panic(fmt.Sprintf("Could not find thread %v.", tid))
}
return thread
}
// generateChoice describes the scheduling choices available at the
// current abstract program state.
func (e *execution) generateChoice() *choice {
c := newChoice()
enabled := make([]TID, 0)
for tid, thread := range e.threads {
t := &transition{
tid: tid,
clock: thread.clock.clone(),
enabled: thread.enabled(e.ctx),
kind: thread.kind(),
readSet: thread.readSet(),
writeSet: thread.writeSet(),
}
c.transitions[tid] = t
if t.enabled {
enabled = append(enabled, tid)
}
}
if len(c.transitions) == 0 {
panic("Encountered a deadlock.")
}
if e.nsteps < len(e.strategy) {
// Follow the scheduling strategy.
c.next = e.strategy[e.nsteps]
} else {
// Schedule an enabled thread using a deterministic round-robin
// scheduler.
sort.Sort(IncreasingTID(enabled))
index := 0
for ; index < len(enabled) && enabled[index] <= e.activeTID; index++ {
}
if index == len(enabled) {
index = 0
}
c.next = enabled[index]
}
return c
}
// schedule advances the execution of the given thread.
func (e *execution) schedule(tid TID) {
e.nrequests--
e.nsteps++
thread, ok := e.threads[tid]
if !ok {
panic(fmt.Sprintf("Could not find thread %v.\n", tid))
}
if !thread.enabled(e.ctx) {
panic(fmt.Sprintf("Thread %v is about to be scheduled and is not enabled.", tid))
}
e.done = make(chan struct{})
close(thread.ready)
<-e.done
}