the client watch impl

The client subscribes to store update notifications via a go
channel. The go channel is only used to send a bit 'there is
a watch log update'. Whenever a client sees the notification,
it scans through the watch log for new updates (like the sync
watcher does), filters out unnecessary or inaccessible log
records and sends the result to the client.

Change-Id: I143c1228d3b0808f1910c9b6dc17b19758dcf1de
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 010643d..72eebf2 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -16,12 +16,6 @@
 // where <key> is the client-specified key.
 package watchable
 
-// TODO(sadovsky): Write unit tests. (As of May 2015 we're still iterating on
-// the design for how to expose a "watch" API from the storage engine, and we
-// don't want to write lots of tests prematurely.)
-// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
-// TODO(sadovsky): Allow clients to subscribe via Go channel.
-
 import (
 	"fmt"
 	"strings"
@@ -51,25 +45,28 @@
 		return nil, err
 	}
 	return &wstore{
-		ist:   st,
-		opts:  opts,
-		seq:   seq,
-		clock: clock.NewVClock(),
+		ist:     st,
+		watcher: newWatcher(),
+		opts:    opts,
+		seq:     seq,
+		clock:   clock.NewVClock(),
 	}, nil
 }
 
 type wstore struct {
-	ist   store.Store
-	opts  *Options
-	mu    sync.Mutex    // held during transaction commits; protects seq
-	seq   uint64        // the next sequence number to be used for a new commit
-	clock *clock.VClock // used to provide write timestamps
+	ist     store.Store
+	watcher *watcher
+	opts    *Options
+	mu      sync.Mutex    // held during transaction commits; protects seq
+	seq     uint64        // the next sequence number to be used for a new commit
+	clock   *clock.VClock // used to provide write timestamps
 }
 
 var _ Store = (*wstore)(nil)
 
 // Close implements the store.Store interface.
 func (st *wstore) Close() error {
+	st.watcher.close()
 	return st.ist.Close()
 }
 
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 304c4d6..0419e7c 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -158,6 +158,7 @@
 		return err
 	}
 	tx.st.seq = seq
+	tx.st.watcher.broadcastUpdates()
 	return nil
 }
 
diff --git a/services/syncbase/server/watchable/watcher.go b/services/syncbase/server/watchable/watcher.go
index 1209a1e..fc0481a 100644
--- a/services/syncbase/server/watchable/watcher.go
+++ b/services/syncbase/server/watchable/watcher.go
@@ -7,6 +7,7 @@
 import (
 	"fmt"
 	"strconv"
+	"sync"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
@@ -16,6 +17,79 @@
 	"v.io/x/lib/vlog"
 )
 
+// watcher maintains a state and a condition variable. The watcher sends
+// a broadcast signal every time the state changes. The state is increased
+// by 1 every time the store has new data. Initially the state equals to 1.
+// If the state becomes 0, then the watcher is closed and the state will not
+// be changed later.
+// TODO(rogulenko): Broadcast a signal from time to time to unblock waiting
+// clients.
+type watcher struct {
+	mu    *sync.RWMutex
+	cond  *sync.Cond
+	state uint64
+}
+
+func newWatcher() *watcher {
+	mu := &sync.RWMutex{}
+	return &watcher{
+		mu:    mu,
+		cond:  sync.NewCond(mu.RLocker()),
+		state: 1,
+	}
+}
+
+// close closes the watcher.
+func (w *watcher) close() {
+	w.mu.Lock()
+	w.state = 0
+	w.cond.Broadcast()
+	w.mu.Unlock()
+}
+
+// broadcastUpdates broadcast the update notification to watch clients.
+func (w *watcher) broadcastUpdates() {
+	w.mu.Lock()
+	if w.state != 0 {
+		w.state++
+		w.cond.Broadcast()
+	} else {
+		vlog.Error("broadcastUpdates() called on a closed watcher")
+	}
+	w.mu.Unlock()
+}
+
+// WatchUpdates returns a function that can be used to watch for changes of
+// the database. The store maintains a state (initially 1) that is increased
+// by 1 every time the store has new data. The waitForChange function takes
+// the last returned state and blocks until the state changes, returning the new
+// state. State equal to 0 means the store is closed and no updates will come
+// later. If waitForChange function takes a state different from the current
+// state of the store or the store is closed, the waitForChange function returns
+// immediately. It might happen that the waitForChange function returns
+// a non-zero state equal to the state passed as the argument. This behavior
+// helps to unblock clients if the store doesn't have updates for a long period
+// of time.
+func WatchUpdates(st store.Store) (waitForChange func(state uint64) uint64) {
+	// TODO(rogulenko): Remove dynamic type assertion here and in other places.
+	watcher := st.(*wstore).watcher
+	return func(state uint64) uint64 {
+		watcher.cond.L.Lock()
+		defer watcher.cond.L.Unlock()
+		if watcher.state != 0 && watcher.state == state {
+			watcher.cond.Wait()
+		}
+		return watcher.state
+	}
+}
+
+// GetResumeMarker returns the ResumeMarker that points to the current end
+// of the event log.
+func GetResumeMarker(st store.StoreReader) (watch.ResumeMarker, error) {
+	seq, err := getNextLogSeq(st)
+	return watch.ResumeMarker(logEntryKey(seq)), err
+}
+
 // MakeResumeMarker converts a sequence number to the resume marker.
 func MakeResumeMarker(seq uint64) watch.ResumeMarker {
 	return watch.ResumeMarker(logEntryKey(seq))
@@ -27,9 +101,9 @@
 	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
 }
 
-// WatchLogBatch returns a batch of watch log records (a transaction) from
+// ReadBatchFromLog returns a batch of watch log records (a transaction) from
 // the given database and the new resume marker at the end of the batch.
-func WatchLogBatch(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
+func ReadBatchFromLog(st store.Store, resumeMarker watch.ResumeMarker) ([]*LogEntry, watch.ResumeMarker, error) {
 	seq, err := parseResumeMarker(string(resumeMarker))
 	if err != nil {
 		return nil, resumeMarker, err
diff --git a/services/syncbase/server/watchable/watcher_test.go b/services/syncbase/server/watchable/watcher_test.go
index 246bc65..c978123 100644
--- a/services/syncbase/server/watchable/watcher_test.go
+++ b/services/syncbase/server/watchable/watcher_test.go
@@ -45,7 +45,7 @@
 	var seq uint64
 
 	for i := 0; i < (numTx + 3); i++ {
-		logs, newResmark, err := WatchLogBatch(st, resmark)
+		logs, newResmark, err := ReadBatchFromLog(st, resmark)
 		if err != nil {
 			t.Fatalf("can't get watch log batch: %v", err)
 		}