the client watch impl
The client subscribes to store update notifications via a go
channel. The go channel is only used to send a bit 'there is
a watch log update'. Whenever a client sees the notification,
it scans through the watch log for new updates (like the sync
watcher does), filters out unnecessary or inaccessible log
records and sends the result to the client.
Change-Id: I143c1228d3b0808f1910c9b6dc17b19758dcf1de
diff --git a/services/syncbase/server/nosql/database_watch.go b/services/syncbase/server/nosql/database_watch.go
index 951444d..35228e6 100644
--- a/services/syncbase/server/nosql/database_watch.go
+++ b/services/syncbase/server/nosql/database_watch.go
@@ -5,30 +5,213 @@
package nosql
import (
- nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
+ "bytes"
+ "strings"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ pubutil "v.io/syncbase/v23/syncbase/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/services/watch"
+ "v.io/v23/vdl"
"v.io/v23/verror"
)
-// WatchGlob implements the nosqlwire.DatabaseWatcher interface.
+// GetResumeMarker implements the wire.DatabaseWatcher interface.
+func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
+ if !d.exists {
+ return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+ }
+ if d.batchId != nil {
+ return watchable.GetResumeMarker(d.batchReader())
+ } else {
+ return watchable.GetResumeMarker(d.st)
+ }
+}
+
+// WatchGlob implements the wire.DatabaseWatcher interface.
func (d *databaseReq) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
- // TODO(rogulenko): Implement.
+ // TODO(rogulenko): Check permissions here and in other methods.
if !d.exists {
return verror.New(verror.ErrNoExist, ctx, d.name)
}
if d.batchId != nil {
- return nosqlwire.NewErrBoundToBatch(ctx)
+ return wire.NewErrBoundToBatch(ctx)
}
- return verror.NewErrNotImplemented(ctx)
+ // Parse the pattern.
+ if !strings.HasSuffix(req.Pattern, "*") {
+ return verror.New(verror.ErrBadArg, ctx, req.Pattern)
+ }
+ table, prefix, err := pubutil.ParseTableRowPair(ctx, strings.TrimSuffix(req.Pattern, "*"))
+ if err != nil {
+ return err
+ }
+ // Get the resume marker and fetch the initial state if necessary.
+ resumeMarker := req.ResumeMarker
+ if bytes.Equal(resumeMarker, []byte("now")) || len(resumeMarker) == 0 {
+ var err error
+ if resumeMarker, err = watchable.GetResumeMarker(d.st); err != nil {
+ return err
+ }
+ if len(req.ResumeMarker) == 0 {
+ // TODO(rogulenko): Fetch the initial state.
+ return verror.NewErrNotImplemented(ctx)
+ }
+ }
+ t := tableReq{
+ name: table,
+ d: d,
+ }
+ return t.watchUpdates(ctx, call, prefix, resumeMarker)
}
-// GetResumeMarker implements the nosqlwire.DatabaseWatcher interface.
-func (d *databaseReq) GetResumeMarker(ctx *context.T, call rpc.ServerCall) (watch.ResumeMarker, error) {
- // TODO(rogulenko): Implement.
- if !d.exists {
- return nil, verror.New(verror.ErrNoExist, ctx, d.name)
+// watchUpdates waits for database updates and sends them to the client.
+// This function does two steps in a for loop:
+// - scan through the watch log until the end, sending all updates to the client
+// - wait for one of two signals: new updates available or the call is canceled.
+// The 'new updates' signal is sent by a worker goroutine that translates a
+// condition variable signal to a Go channel. The worker goroutine waits on the
+// condition variable for changes. Whenever the state changes, the worker sends
+// a signal through the Go channel.
+func (t *tableReq) watchUpdates(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, prefix string, resumeMarker watch.ResumeMarker) error {
+ // The Go channel to send notifications from the worker to the main
+ // goroutine.
+ hasUpdates := make(chan struct{})
+ // The Go channel to signal the worker to stop. The worker might block
+ // on the condition variable, but we don't want the main goroutine
+ // to wait for the worker to stop, so we create a buffered channel.
+ cancelWorker := make(chan struct{}, 1)
+ defer close(cancelWorker)
+ go func() {
+ waitForChange := watchable.WatchUpdates(t.d.st)
+ var state, newState uint64 = 1, 1
+ for {
+ // Wait until the state changes or the main function returns.
+ for newState == state {
+ select {
+ case <-cancelWorker:
+ return
+ default:
+ }
+ newState = waitForChange(state)
+ }
+ // Update the current state to the new value and sends a signal to
+ // the main goroutine.
+ state = newState
+ if state == 0 {
+ close(hasUpdates)
+ return
+ }
+ // cancelWorker is closed as soons as the main function returns.
+ select {
+ case hasUpdates <- struct{}{}:
+ case <-cancelWorker:
+ return
+ }
+ }
+ }()
+
+ sender := call.SendStream()
+ for {
+ // Drain the log queue.
+ for {
+ logs, nextResumeMarker, err := watchable.ReadBatchFromLog(t.d.st, resumeMarker)
+ if err != nil {
+ return err
+ }
+ if logs == nil {
+ // No new log records available now.
+ break
+ }
+ resumeMarker = nextResumeMarker
+ changes, err := t.processLogBatch(ctx, call, prefix, logs)
+ if err != nil {
+ return err
+ }
+ if changes == nil {
+ // All batch changes are filtered out.
+ continue
+ }
+ changes[len(changes)-1].ResumeMarker = resumeMarker
+ for _, change := range changes {
+ if err := sender.Send(change); err != nil {
+ return err
+ }
+ }
+ }
+ // Wait for new updates or cancel.
+ select {
+ case _, ok := <-hasUpdates:
+ if !ok {
+ return verror.NewErrAborted(ctx)
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- return nil, verror.NewErrNotImplemented(ctx)
+}
+
+// processLogBatch converts []*watchable.LogEntry to []watch.Change, filtering
+// out unnecessary or inaccessible log records.
+func (t *tableReq) processLogBatch(ctx *context.T, call rpc.ServerCall, prefix string, logs []*watchable.LogEntry) ([]watch.Change, error) {
+ sn := t.d.st.NewSnapshot()
+ defer sn.Abort()
+ var changes []watch.Change
+ for _, logEntry := range logs {
+ var opKey string
+ switch op := logEntry.Op.(type) {
+ case watchable.OpPut:
+ opKey = string(op.Value.Key)
+ case watchable.OpDelete:
+ opKey = string(op.Value.Key)
+ default:
+ continue
+ }
+ parts := util.SplitKeyParts(opKey)
+ // TODO(rogulenko): Currently we process only rows, i.e. keys of the form
+ // $row:xxx:yyy. Consider processing other keys.
+ if len(parts) != 3 || parts[0] != util.RowPrefix {
+ continue
+ }
+ table, row := parts[1], parts[2]
+ // Filter out unnecessary rows and rows that we can't access.
+ if table != t.name || !strings.HasPrefix(row, prefix) {
+ continue
+ }
+ if err := t.checkAccess(ctx, call, sn, row); err != nil {
+ if verror.ErrorID(err) != verror.ErrNoAccess.ID {
+ return nil, err
+ }
+ continue
+ }
+ change := watch.Change{
+ Name: naming.Join(table, row),
+ Continued: true,
+ }
+ switch op := logEntry.Op.(type) {
+ case watchable.OpPut:
+ rowValue, err := watchable.GetAtVersion(ctx, sn, op.Value.Key, nil, op.Value.Version)
+ if err != nil {
+ return nil, err
+ }
+ change.State = watch.Exists
+ change.Value = vdl.ValueOf(wire.StoreChange{
+ Value: rowValue,
+ FromSync: logEntry.FromSync,
+ })
+ case watchable.OpDelete:
+ change.State = watch.DoesNotExist
+ change.Value = vdl.ValueOf(wire.StoreChange{
+ FromSync: logEntry.FromSync,
+ })
+ }
+ changes = append(changes, change)
+ }
+ if len(changes) > 0 {
+ changes[len(changes)-1].Continued = false
+ }
+ return changes, nil
}
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 010643d..72eebf2 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -16,12 +16,6 @@
// where <key> is the client-specified key.
package watchable
-// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
-// the design for how to expose a "watch" API from the storage engine, and we
-// don't want to write lots of tests prematurely.)
-// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
-// TODO(sadovsky): Allow clients to subscribe via Go channel.
-
import (
"fmt"
"strings"
@@ -51,25 +45,28 @@
return nil, err
}
return &wstore{
- ist: st,
- opts: opts,
- seq: seq,
- clock: clock.NewVClock(),
+ ist: st,
+ watcher: newWatcher(),
+ opts: opts,
+ seq: seq,
+ clock: clock.NewVClock(),
}, nil
}
type wstore struct {
- ist store.Store
- opts *Options
- mu sync.Mutex // held during transaction commits; protects seq
- seq uint64 // the next sequence number to be used for a new commit
- clock *clock.VClock // used to provide write timestamps
+ ist store.Store
+ watcher *watcher
+ opts *Options
+ mu sync.Mutex // held during transaction commits; protects seq
+ seq uint64 // the next sequence number to be used for a new commit
+ clock *clock.VClock // used to provide write timestamps
}
var _ Store = (*wstore)(nil)
// Close implements the store.Store interface.
func (st *wstore) Close() error {
+ st.watcher.close()
return st.ist.Close()
}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 304c4d6..0419e7c 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -158,6 +158,7 @@
return err
}
tx.st.seq = seq
+ tx.st.watcher.broadcastUpdates()
return nil
}
diff --git a/services/syncbase/server/watchable/watcher.go b/services/syncbase/server/watchable/watcher.go
index 1209a1e..fc0481a 100644
--- a/services/syncbase/server/watchable/watcher.go
+++ b/services/syncbase/server/watchable/watcher.go
@@ -7,6 +7,7 @@
import (
"fmt"
"strconv"
+ "sync"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
@@ -16,6 +17,79 @@
"v.io/x/lib/vlog"
)
+// watcher maintains a state and a condition variable. The watcher sends
+// a broadcast signal every time the state changes. The state is increased
+// by 1 every time the store has new data. Initially the state equals to 1.
+// If the state becomes 0, then the watcher is closed and the state will not
+// be changed later.
+// TODO(rogulenko): Broadcast a signal from time to time to unblock waiting
+// clients.
+type watcher struct {
+ mu *sync.RWMutex
+ cond *sync.Cond
+ state uint64
+}
+
+func newWatcher() *watcher {
+ mu := &sync.RWMutex{}
+ return &watcher{
+ mu: mu,
+ cond: sync.NewCond(mu.RLocker()),
+ state: 1,
+ }
+}
+
+// close closes the watcher.
+func (w *watcher) close() {
+ w.mu.Lock()
+ w.state = 0
+ w.cond.Broadcast()
+ w.mu.Unlock()
+}
+
+// broadcastUpdates broadcast the update notification to watch clients.
+func (w *watcher) broadcastUpdates() {
+ w.mu.Lock()
+ if w.state != 0 {
+ w.state++
+ w.cond.Broadcast()
+ } else {
+ vlog.Error("broadcastUpdates() called on a closed watcher")
+ }
+ w.mu.Unlock()
+}
+
+// WatchUpdates returns a function that can be used to watch for changes of
+// the database. The store maintains a state (initially 1) that is increased
+// by 1 every time the store has new data. The waitForChange function takes
+// the last returned state and blocks until the state changes, returning the new
+// state. State equal to 0 means the store is closed and no updates will come
+// later. If waitForChange function takes a state different from the current
+// state of the store or the store is closed, the waitForChange function returns
+// immediately. It might happen that the waitForChange function returns
+// a non-zero state equal to the state passed as the argument. This behavior
+// helps to unblock clients if the store doesn't have updates for a long period
+// of time.
+func WatchUpdates(st store.Store) (waitForChange func(state uint64) uint64) {
+ // TODO(rogulenko): Remove dynamic type assertion here and in other places.
+ watcher := st.(*wstore).watcher
+ return func(state uint64) uint64 {
+ watcher.cond.L.Lock()
+ defer watcher.cond.L.Unlock()
+ if watcher.state != 0 && watcher.state == state {
+ watcher.cond.Wait()
+ }
+ return watcher.state
+ }
+}
+
+// GetResumeMarker returns the ResumeMarker that points to the current end
+// of the event log.
+func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error) {
+ seq, err := getNextLogSeq(st)
+ return watch.ResumeMarker(logEntryKey(seq)), err
+}
+
// MakeResumeMarker converts a sequence number to the resume marker.
func MakeResumeMarker(seq uint64) watch.ResumeMarker {
return watch.ResumeMarker(logEntryKey(seq))
@@ -27,9 +101,9 @@
return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
}
-// WatchLogBatch returns a batch of watch log records (a transaction) from
+// ReadBatchFromLog returns a batch of watch log records (a transaction) from
// the given database and the new resume marker at the end of the batch.
-func WatchLogBatch(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
+func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
seq, err := parseResumeMarker(string(resumeMarker))
if err != nil {
return nil, resumeMarker, err
diff --git a/services/syncbase/server/watchable/watcher_test.go b/services/syncbase/server/watchable/watcher_test.go
index 246bc65..c978123 100644
--- a/services/syncbase/server/watchable/watcher_test.go
+++ b/services/syncbase/server/watchable/watcher_test.go
@@ -45,7 +45,7 @@
var seq uint64
for i := 0; i < (numTx + 3); i++ {
- logs, newResmark, err := WatchLogBatch(st, resmark)
+ logs, newResmark, err := ReadBatchFromLog(st, resmark)
if err != nil {
t.Fatalf("can't get watch log batch: %v", err)
}
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 04c3bed..ae69dc6 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -123,7 +123,7 @@
s.initDbSyncStateInMem(ctx, appName, dbName)
// Get a batch of watch log entries, if any, after this resume marker.
- logs, nextResmark, err := watchable.WatchLogBatch(st, resMark)
+ logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
if err != nil {
vlog.Fatalf("sync: processDatabase: %s, %s: cannot get watch log batch: %v", appName, dbName, verror.DebugString(err))
}