blob: d94ac4f2b2508faba96c79df6f2646de33c3d5e6 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client
import (
"strings"
"sync"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/services/watch"
"v.io/v23/syncbase"
"v.io/v23/syncbase/util"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/longevity_tests/model"
)
// Watcher is a client that watches a range of keys in a set of database collections.
type Watcher struct {
// Prefix to watch. Defaults to empty string.
// TODO(nlacasse): Allow different prefixes per collection?
Prefix string
// ResumeMarker to begin watching from.
// TODO(nlacasse): Allow different ResumeMarkers per collection?
ResumeMarker watch.ResumeMarker
// OnChange runs for each WatchChange. Must be goroutine-safe. By default
// this will be a no-op.
OnChange func(syncbase.WatchChange)
ctx *context.T
// Map of databases and their respective collections.
dbColMap map[syncbase.Database][]syncbase.Collection
stopChan chan struct{}
err error
errMu sync.Mutex
// wg waits until all spawned goroutines stop.
wg sync.WaitGroup
}
var _ Client = (*Watcher)(nil)
func (w *Watcher) String() string {
dbNames := []string{}
for db := range w.dbColMap {
dbNames = append(dbNames, db.Id().Name)
}
return strings.Join(append([]string{"Watcher"}, dbNames...), "-")
}
func (w *Watcher) Start(ctx *context.T, sbName string, dbModels model.DatabaseSet) {
w.ctx = ctx
w.err = nil
w.stopChan = make(chan struct{})
w.wg.Add(1)
go func() {
defer w.wg.Done()
var err error
w.dbColMap, _, err = CreateDbsAndCollections(ctx, sbName, dbModels)
if err != nil {
w.setError(err)
return
}
for db, colSlice := range w.dbColMap {
for _, col := range colSlice {
// Create a watch stream for the collection.
// TODO(ivanpi): Simplify now that Watch can span collections.
stream := db.Watch(ctx, w.ResumeMarker, []wire.CollectionRowPattern{
util.RowPrefixPattern(col.Id(), w.Prefix),
})
defer stream.Cancel()
// Spawn a goroutine to repeatedly call stream.Advance() and
// process any changes.
w.wg.Add(1)
go func() {
defer w.wg.Done()
for {
advance := stream.Advance()
if !advance {
if err := stream.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
w.setError(err)
}
return
}
change := stream.Change()
if w.OnChange != nil {
w.OnChange(change)
}
}
}()
}
}
// Wait for stopChan to close.
<-w.stopChan
}()
}
func (w *Watcher) Stop() error {
close(w.stopChan)
w.wg.Wait()
return w.err
}
func (w *Watcher) setError(err error) {
w.errMu.Lock()
defer w.errMu.Unlock()
if err != nil && w.err == nil {
w.err = err
}
}