// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package watchable
import (
// watcher maintains a set of watch clients receiving on update channels. When
// watcher is notified of a change in the store, it sends a value to all client
// channels that do not already have a value pending. This is done by a separate
// goroutine (watcherLoop) to move it off the broadcastUpdates() critical path.
type watcher struct {
// Channel used by broadcastUpdates() to notify watcherLoop. When watcher is
// closed, updater is closed.
updater chan struct{}
// Protects the clients map.
mu sync.RWMutex
// Currently registered clients, notified by watcherLoop via their channels.
// When watcher is closed, all clients are stopped (and their channels closed)
// with ErrAborted and clients is set to nil.
clients map[*Client]struct{}
func newWatcher() *watcher {
ret := &watcher{
updater: make(chan struct{}, 1),
clients: make(map[*Client]struct{}),
go ret.watcherLoop()
return ret
// close closes the watcher. Idempotent.
func (w *watcher) close() {
if w.clients != nil {
// Stop all clients and close their channels.
for c := range w.clients {
// Set clients to nil to mark watcher as closed.
w.clients = nil
// Close updater to notify watcherLoop to exit.
// broadcastUpdates notifies the watcher of an update. The watcher loop will
// propagate the notification to watch clients.
func (w *watcher) broadcastUpdates() {
if w.clients != nil {
} else {
vlog.Error("broadcastUpdates() called on a closed watcher")
// watcherLoop implements the goroutine that waits for updates and notifies any
// waiting clients.
func (w *watcher) watcherLoop() {
for {
// If updater has been closed, exit.
if _, ok := <-w.updater; !ok {
for c := range w.clients { // safe for w.clients == nil
// ping writes a signal to a buffered notification channel. If a notification
// is already pending, it is a no-op.
func ping(c chan<- struct{}) {
select {
case c <- struct{}{}: // sent notification
default: // already has notification pending
// closeAndDrain closes a buffered notification channel and drains the buffer
// so that receivers see the closed state sooner.
func closeAndDrain(c chan struct{}) {
for _, ok := <-c; ok; _, ok = <-c {
// no-op
// watchUpdates - see WatchUpdates.
func (w *watcher) watchUpdates(seq uint64) (_ *Client, cancel func()) {
if w.clients == nil {
// watcher is closed. Return stopped Client.
return newStoppedClient(verror.NewErrAborted(nil)), func() {}
// Register and return client.
c := newClient(seq)
w.clients[c] = struct{}{}
cancel = func() {
if _, ok := w.clients[c]; ok { // safe for w.clients == nil
delete(w.clients, c)
return c, cancel
// WatchUpdates returns a Client which supports waiting for changes and
// iterating over the watch log starting from resumeMarker, as well as a
// cancel function which MUST be called to release watch resources.
func (st *Store) WatchUpdates(resumeMarker watch.ResumeMarker) (_ *Client, cancel func()) {
seq, err := parseResumeMarker(string(resumeMarker))
if err != nil {
// resumeMarker is invalid. Return stopped Client.
return newStoppedClient(err), func() {}
return st.watcher.watchUpdates(seq)
// Client encapsulates a channel used to notify watch clients of store updates
// and an iterator over the watch log.
type Client struct {
// Channel used by watcherLoop to notify the client. When the client is
// stopped, update is closed.
update chan struct{}
// Protects the fields below.
mu sync.Mutex
// Sequence number pointing to the start of the previously retrieved log
// batch. Equal to nextSeq if the retrieved batch was empty.
prevSeq uint64
// Sequence number pointing to the start of the next log batch to retrieve.
nextSeq uint64
// When the client is stopped, err is set to the reason for stopping.
err error
func newClient(seq uint64) *Client {
return &Client{
update: make(chan struct{}, 1),
prevSeq: seq,
nextSeq: seq,
func newStoppedClient(err error) *Client {
c := newClient(0)
return c
// Wait returns the update channel that can be used to wait for new changes in
// the store. If the update channel is closed, the client is stopped and no more
// updates will happen. Otherwise, the channel will have a value available
// whenever the store has changed since the last receive on the channel.
func (c *Client) Wait() <-chan struct{} {
return c.update
// NextBatchFromLog returns the next batch of watch log records (transaction)
// from the given database and the resume marker at the end of the batch. If
// there is no batch available, it returns a nil slice and the same resume
// marker as the previous NextBatchFromLog call. The returned log entries are
// guaranteed to point to existing data versions until either the client is
// stopped or NextBatchFromLog is called again. If the client is stopped,
// NextBatchFromLog returns the same error as Err.
func (c *Client) NextBatchFromLog(st store.Store) ([]*LogEntry, watch.ResumeMarker, error) {
if c.err != nil {
return nil, nil, c.err
batch, batchEndSeq, err := readBatchFromLog(st, c.nextSeq)
if err != nil {
// We cannot call stop() here since is locked. However, we checked
// above that c.err is nil, so it is safe to set c.err anc close c.update.
c.err = err
return nil, nil, err
c.prevSeq = c.nextSeq
c.nextSeq = batchEndSeq
return batch, MakeResumeMarker(batchEndSeq), nil
// Err returns the error that caused the client to stop watching. If the error
// is nil, the client is active. Otherwise:
// * ErrCanceled - watch was canceled by the client.
// * ErrAborted - watcher was closed (store was closed, possibly destroyed).
// * ErrUnknownResumeMarker - watch was started with an invalid resume marker.
// * other errors - NextBatchFromLog encountered an error.
func (c *Client) Err() error {
return c.err
// stop closes the client update channel and sets the error returned by Err.
// Idempotent (only the error from the first call to stop is kept).
func (c *Client) stop(err error) {
if c.err == nil {
c.err = err
// GetResumeMarker returns the ResumeMarker that points to the current end
// of the event log.
func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error) {
seq, err := getNextLogSeq(st)
return watch.ResumeMarker(logEntryKey(seq)), err
// MakeResumeMarker converts a sequence number to the resume marker.
func MakeResumeMarker(seq uint64) watch.ResumeMarker {
return watch.ResumeMarker(logEntryKey(seq))
func logEntryKey(seq uint64) string {
// Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
return join(common.LogPrefix, fmt.Sprintf("%016x", seq))
// readBatchFromLog returns a batch of watch log records (a transaction) from
// the given database and the next sequence number at the end of the batch.
func readBatchFromLog(st store.Store, seq uint64) ([]*LogEntry, uint64, error) {
_, scanLimit := common.ScanPrefixArgs(common.LogPrefix, "")
scanStart := MakeResumeMarker(seq)
endOfBatch := false
// Use the store directly to scan these read-only log entries, no need
// to create a snapshot since they are never overwritten. Read and
// buffer a batch before processing it.
var logs []*LogEntry
stream := st.Scan(scanStart, scanLimit)
for stream.Advance() {
var logEnt LogEntry
if err := vom.Decode(stream.Value(nil), &logEnt); err != nil {
return nil, seq, err
logs = append(logs, &logEnt)
// Stop if this is the end of the batch.
if logEnt.Continued == false {
endOfBatch = true
if !endOfBatch {
if err := stream.Err(); err != nil {
return nil, seq, err
if len(logs) > 0 {
vlog.Fatalf("end of batch not found after %d entries", len(logs))
return nil, seq, nil
return logs, seq, nil
func parseResumeMarker(resumeMarker string) (uint64, error) {
// See logEntryKey() for the structure of a resume marker key.
parts := common.SplitNKeyParts(resumeMarker, 2)
if len(parts) != 2 {
return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
seq, err := strconv.ParseUint(parts[1], 16, 64)
if err != nil {
return 0, verror.New(watch.ErrUnknownResumeMarker, nil, resumeMarker)
return seq, nil
// logEntryExists returns true iff the log contains an entry with the given
// sequence number.
func logEntryExists(st store.StoreReader, seq uint64) (bool, error) {
_, err := st.Get([]byte(logEntryKey(seq)), nil)
if err != nil && verror.ErrorID(err) != store.ErrUnknownKey.ID {
return false, err
return err == nil, nil
// getNextLogSeq returns the next sequence number to be used for a new commit.
// NOTE: this function assumes that all sequence numbers in the log represent
// some range [start, limit] without gaps.
func getNextLogSeq(st store.StoreReader) (uint64, error) {
// Determine initial value for seq.
// TODO(sadovsky): Consider using a bigger seq.
// Find the beginning of the log.
it := st.Scan(common.ScanPrefixArgs(common.LogPrefix, ""))
if !it.Advance() {
return 0, nil
defer it.Cancel()
if it.Err() != nil {
return 0, it.Err()
seq, err := parseResumeMarker(string(it.Key(nil)))
if err != nil {
return 0, err
var step uint64 = 1
// Suppose the actual value we are looking for is S. First, we estimate the
// range for S. We find seq, step: seq < S <= seq + step.
for {
if ok, err := logEntryExists(st, seq+step); err != nil {
return 0, err
} else if !ok {
seq += step
step *= 2
// Next we keep the seq < S <= seq + step invariant, reducing step to 1.
for step > 1 {
step /= 2
if ok, err := logEntryExists(st, seq+step); err != nil {
return 0, err
} else if ok {
seq += step
// Now seq < S <= seq + 1, thus S = seq + 1.
return seq + 1, nil