Merge "TBR: mojo.syncbase: Replace V23_ROOT with JIRI_ROOT everywhere."
diff --git a/Makefile b/Makefile
index e737024..b9676d5 100644
--- a/Makefile
+++ b/Makefile
@@ -107,8 +107,8 @@
 	# TODO(nlacasse): Fix dart mojom binding generator so it does not produce
 	# files that violate dartanalyzer.  For now, we use "grep -v" to hide all
 	# hints and warnings from *.mojom.dart files.
-	cd dart && dartanalyzer bin/*.dart lib/*.dart test/*.dart | grep -v "\.mojom\.dart, line"
-	cd sky_demo && dartanalyzer lib/*.dart | grep -v "\.mojom\.dart, line"
+	cd dart && dartanalyzer bin/**/*.dart lib/**/*.dart test/**/*.dart | grep -v "\.mojom\.dart, line"
+	cd sky_demo && dartanalyzer lib/**/*.dart | grep -v "\.mojom\.dart, line"
 
 # Installs dart dependencies.
 dart/packages: dart/pubspec.yaml
diff --git a/README.md b/README.md
index ecaae0e..4f3bc9c 100644
--- a/README.md
+++ b/README.md
@@ -18,11 +18,11 @@
 2. Install [Goma][goma].
 3. Put the following in your `.bashrc`:
 
-       # NOTE: Actual locations depend on where you installed depot_tools and
-       # goma.
-       export PATH=${PATH}:${HOME}/dev/depot_tools
-       export GOMA_DIR=${HOME}/goma
-       export MOJO_DIR=${HOME}/mojo
+        # NOTE: Actual locations depend on where you installed depot_tools and
+        # goma.
+        export PATH=${PATH}:${HOME}/dev/depot_tools
+        export GOMA_DIR=${HOME}/goma
+        export MOJO_DIR=${HOME}/mojo
 
 ### Download Mojo repo
 
@@ -47,21 +47,21 @@
 
 1. Start by updating the repo.
 
-       $ cd $MOJO_DIR/src
-       $ git checkout master
-       $ git pull
-       $ gclient sync
+        $ cd $MOJO_DIR/src
+        $ git checkout master
+        $ git pull
+        $ gclient sync
 
 2. Compile for Linux.  Built resources will be in `$MOJO_DIR/src/out/Debug`
 
-       $ ./mojo/tools/mojob.py gn
-       $ ./mojo/tools/mojob.py build # NOTE: This can take up to 10 minutes.
+        $ ./mojo/tools/mojob.py gn
+        $ ./mojo/tools/mojob.py build # NOTE: This can take up to 10 minutes.
 
 3. Compile for Android.  Built resources will be in
    `$MOJO_DIR/src/out/android_Debug`
 
-       $ ./mojo/tools/mojob.py gn --android
-       $ ./mojo/tools/mojob.py build --android # NOTE: This can take up to 10 minutes.
+        $ ./mojo/tools/mojob.py gn --android
+        $ ./mojo/tools/mojob.py build --android # NOTE: This can take up to 10 minutes.
 
 ## Sky setup
 
diff --git a/dart/lib/src/app.dart b/dart/lib/src/app.dart
index 8ec5313..6e56c3b 100644
--- a/dart/lib/src/app.dart
+++ b/dart/lib/src/app.dart
@@ -9,11 +9,12 @@
   // NOTE(sadovsky): For the Mojo Syncbase service, we only store names from app
   // down - i.e. there is no service name.
   SyncbaseApp._internal(_proxy, relativeName)
-      : super._internal(_proxy, null, relativeName);
+      : super._internal(_proxy, null, relativeName, escape(relativeName));
 
   // noSqlDatabase returns a noSqlDatabase with the given relativeName.
   SyncbaseNoSqlDatabase noSqlDatabase(String relativeName) {
-    return new SyncbaseNoSqlDatabase._internal(_proxy, fullName, relativeName);
+    return new SyncbaseNoSqlDatabase._internal(
+        _proxy, fullName, relativeName, '');
   }
 
   Future create(mojom.Perms perms) async {
diff --git a/dart/lib/src/named_resource.dart b/dart/lib/src/named_resource.dart
index 23058f9..74175c8 100644
--- a/dart/lib/src/named_resource.dart
+++ b/dart/lib/src/named_resource.dart
@@ -6,16 +6,15 @@
 
 // NamedResource is the superclass of resources with names.
 class NamedResource {
-  final String fullName;
-  final String relativeName;
   final mojom.SyncbaseProxy _proxy;
+  final String _parentFullName;
+  final String name;
+  final String fullName;
 
-  NamedResource._internal(
-      mojom.SyncbaseProxy _proxy, String _parentFullName, String relativeName)
+  NamedResource._internal(mojom.SyncbaseProxy _proxy, String _parentFullName,
+      String name, String fullName)
       : this._proxy = _proxy,
-        this.relativeName = relativeName,
-        this.fullName = (_parentFullName == null
-                ? ''
-                : (_parentFullName + '/\$/')) +
-            relativeName {}
+        this._parentFullName = _parentFullName,
+        this.name = name,
+        this.fullName = fullName {}
 }
diff --git a/dart/lib/src/naming/util.dart b/dart/lib/src/naming/util.dart
index f178df8..3b6d0e7 100644
--- a/dart/lib/src/naming/util.dart
+++ b/dart/lib/src/naming/util.dart
@@ -4,8 +4,14 @@
 
 import 'dart:convert' show UTF8;
 
-// TODO(aghassemi): Move these naming utilities outside of Syncbase when we have
-// a Dart Vanadium library
+// TODO(aghassemi): Move these naming utilities out of Syncbase once we
+// implement a Dart Vanadium library.
+
+// TODO(sadovsky): Make this function support more than two input names, and
+// clean the resulting string the same way we do in Go.
+String join(String a, String b) {
+  return a + '/' + b;
+}
 
 // Makes a string representable as a name element by escaping slashes.
 String encodeAsNameElement(String name) {
diff --git a/dart/lib/src/nosql/database.dart b/dart/lib/src/nosql/database.dart
index e3453d7..0288f28 100644
--- a/dart/lib/src/nosql/database.dart
+++ b/dart/lib/src/nosql/database.dart
@@ -6,8 +6,10 @@
 
 // TODO(sadovsky): Add listTables method.
 class SyncbaseNoSqlDatabase extends NamedResource {
-  SyncbaseNoSqlDatabase._internal(_proxy, _parentFullName, relativeName)
-      : super._internal(_proxy, _parentFullName, relativeName);
+  SyncbaseNoSqlDatabase._internal(
+      _proxy, _parentFullName, relativeName, batchSuffix)
+      : super._internal(_proxy, _parentFullName, relativeName,
+            naming.join(_parentFullName, escape(relativeName) + batchSuffix));
 
   // table returns a table with the given relativeName.
   SyncbaseTable table(String relativeName) {
@@ -60,11 +62,39 @@
     return sc.stream;
   }
 
+  Stream<mojom.WatchChange> watch(
+      String tableName, String prefix, List<int> resumeMarker) {
+    StreamController<mojom.WatchChange> sc = new StreamController();
+
+    mojom.WatchGlobStreamStub stub = new mojom.WatchGlobStreamStub.unbound();
+    stub.impl = new WatchGlobStreamImpl._fromStreamController(sc);
+
+    // TODO(aghassemi): Implement naming utilities such as Join in Dart and use them instead.
+    var pattern = tableName + nameSeparator + prefix + '*';
+    var req = new mojom.GlobRequest();
+    req.pattern = pattern;
+    req.resumeMarker = resumeMarker;
+
+    // Call dbWatch asynchronously.
+    _proxy.ptr.dbWatchGlob(fullName, req, stub).then((c) {
+      // TODO(nlacasse): Same question regarding throwing behavior as TableScan.
+      if (isError(c.err)) throw c.err;
+    });
+
+    return sc.stream;
+  }
+
+  Future<List<int>> getResumeMarker() async {
+    var v = await _proxy.ptr.dbGetResumeMarker(fullName);
+    if (isError(v.err)) throw v.err;
+    return v.resumeMarker;
+  }
+
   // TODO(nlacasse): Make a BatchDatabase class similar to what we did in JS.
   Future<String> beginBatch(mojom.BatchOptions opts) async {
     var v = await _proxy.ptr.dbBeginBatch(fullName, opts);
     if (isError(v.err)) throw v.err;
-    return v.batchDn;
+    return v.batchSuffix;
   }
 
   Future commit() async {
@@ -99,7 +129,26 @@
     sc.add(result);
   }
 
-  onDone(mojom.Error err) {
+  // Called by the mojo proxy when the Go function call returns.
+  onReturn(mojom.Error err) {
+    if (isError(err)) {
+      sc.addError(err);
+    }
+    sc.close();
+  }
+}
+
+class WatchGlobStreamImpl implements mojom.WatchGlobStream {
+  final StreamController<mojom.WatchChange> sc;
+  WatchGlobStreamImpl._fromStreamController(this.sc);
+
+  onChange(mojom.WatchChange change) {
+    sc.add(change);
+  }
+
+  // Called by the mojo proxy when the Go function call returns.
+  // Watch technically never returns unless there is an error or it is canceled.
+  onReturn(mojom.Error err) {
     if (isError(err)) {
       sc.addError(err);
     }
diff --git a/dart/lib/src/nosql/row.dart b/dart/lib/src/nosql/row.dart
index 3eeaecf..40ac0e7 100644
--- a/dart/lib/src/nosql/row.dart
+++ b/dart/lib/src/nosql/row.dart
@@ -8,7 +8,8 @@
   final String key;
 
   SyncbaseRow._internal(_proxy, _parentFullName, key)
-      : super._internal(_proxy, _parentFullName, key),
+      : super._internal(_proxy, _parentFullName, relativeName,
+            naming.join(_parentFullName, escape(key))),
         this.key = key;
 
   Future<bool> exists() async {
diff --git a/dart/lib/src/nosql/table.dart b/dart/lib/src/nosql/table.dart
index 619a634..bc671d3 100644
--- a/dart/lib/src/nosql/table.dart
+++ b/dart/lib/src/nosql/table.dart
@@ -6,7 +6,8 @@
 
 class SyncbaseTable extends NamedResource {
   SyncbaseTable._internal(_proxy, _parentFullName, relativeName)
-      : super._internal(_proxy, _parentFullName, relativeName);
+      : super._internal(_proxy, _parentFullName, relativeName,
+            naming.join(_parentFullName, escape(relativeName)));
 
   // row returns a row with the given key.
   SyncbaseRow row(String key) {
@@ -29,6 +30,17 @@
     return v.exists;
   }
 
+  Future<mojom.Perms> getPermissions() async {
+    var v = await _proxy.ptr.tableGetPermissions(fullName);
+    if (isError(v.err)) throw v.err;
+    return v.perms;
+  }
+
+  Future setPermissions(mojom.Perms perms) async {
+    var v = await _proxy.ptr.tableSetPermissions(fullName, perms);
+    if (isError(v.err)) throw v.err;
+  }
+
   Future<List<int>> get(String key) async {
     return this.row(key).get();
   }
@@ -68,19 +80,19 @@
     return sc.stream;
   }
 
-  Future<List<mojom.PrefixPerms>> getPermissions(String key) async {
-    var v = await _proxy.ptr.tableGetPermissions(fullName, key);
+  Future<List<mojom.PrefixPerms>> getPrefixPermissions(String key) async {
+    var v = await _proxy.ptr.tableGetPrefixPermissions(fullName, key);
     if (isError(v.err)) throw v.err;
     return v.permsArr;
   }
 
-  Future setPermissions(String prefix, mojom.Perms perms) async {
-    var v = await _proxy.ptr.tableSetPermissions(fullName, prefix, perms);
+  Future setPrefixPermissions(String prefix, mojom.Perms perms) async {
+    var v = await _proxy.ptr.tableSetPrefixPermissions(fullName, prefix, perms);
     if (isError(v.err)) throw v.err;
   }
 
-  Future deletePermissions(String prefix) async {
-    var v = await _proxy.ptr.tableDeletePermissions(fullName, prefix);
+  Future deletePrefixPermissions(String prefix) async {
+    var v = await _proxy.ptr.tableDeletePrefixPermissions(fullName, prefix);
     if (isError(v.err)) throw v.err;
   }
 }
@@ -93,7 +105,8 @@
     sc.add(keyValue);
   }
 
-  onDone(mojom.Error err) {
+  // Called by the mojom proxy when the Go function call returns.
+  onReturn(mojom.Error err) {
     if (isError(err)) {
       sc.addError(err);
     }
diff --git a/dart/lib/src/nosql/watch_change_types.dart b/dart/lib/src/nosql/watch_change_types.dart
new file mode 100644
index 0000000..8e5b2a5
--- /dev/null
+++ b/dart/lib/src/nosql/watch_change_types.dart
@@ -0,0 +1,8 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+class WatchChangeTypes {
+  static const int put = 0;
+  static const int delete = 1;
+}
diff --git a/dart/lib/src/util.dart b/dart/lib/src/util.dart
new file mode 100644
index 0000000..615b9a9
--- /dev/null
+++ b/dart/lib/src/util.dart
@@ -0,0 +1,19 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+part of syncbase_client;
+
+// escape escapes a component name for use in a Syncbase object name. In
+// particular, it replaces bytes "%" and "/" with the "%" character followed by
+// the byte's two-digit hex code. Clients using the client library need not
+// escape names themselves; the client library does so on their behalf.
+String escape(String s) {
+  return naming.escape(s, '/');
+}
+
+// unescape applies the inverse of escape. Throws exception if the given string
+// is not a valid escaped string.
+String unescape(String s) {
+  return naming.unescape(s);
+}
diff --git a/dart/lib/syncbase_client.dart b/dart/lib/syncbase_client.dart
index bba7982..105ae38 100644
--- a/dart/lib/syncbase_client.dart
+++ b/dart/lib/syncbase_client.dart
@@ -8,6 +8,7 @@
 import 'dart:convert' show UTF8;
 
 import 'package:mojo/bindings.dart' as bindings;
+import 'package:ether/src/naming/util.dart' as naming;
 
 import 'gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart' as mojom;
 
@@ -16,10 +17,13 @@
 // constructors, since the default constructors are not user-friendly.  They
 // take zero arguments, so all fields must be populated with assignments.
 export 'gen/dart-gen/mojom/lib/mojo/syncbase.mojom.dart'
-    show BatchOptions, Perms, SyncGroupMemberInfo, SyncGroupSpec;
+    show BatchOptions, Perms, SyncGroupMemberInfo, SyncGroupSpec, WatchChange;
+
+export 'src/nosql/watch_change_types.dart' show WatchChangeTypes;
 
 part 'src/app.dart';
 part 'src/named_resource.dart';
+part 'src/util.dart';
 part 'src/nosql/database.dart';
 part 'src/nosql/row.dart';
 part 'src/nosql/rowrange.dart';
diff --git a/dart/test/integration/syncbase_database_test.dart b/dart/test/integration/syncbase_database_test.dart
index 279d3fc..bf6b6b8 100644
--- a/dart/test/integration/syncbase_database_test.dart
+++ b/dart/test/integration/syncbase_database_test.dart
@@ -4,9 +4,12 @@
 
 library syncbase_database_test;
 
+import 'dart:convert' show UTF8;
+
 import 'package:test/test.dart';
 
-import 'package:ether/syncbase_client.dart' show SyncbaseClient;
+import 'package:ether/syncbase_client.dart'
+    show SyncbaseClient, WatchChangeTypes, WatchChange;
 
 import './utils.dart' as utils;
 
@@ -16,7 +19,7 @@
     var dbName = utils.uniqueName('db');
     var db = app.noSqlDatabase(dbName);
     expect(db.relativeName, equals(dbName));
-    expect(db.fullName, equals(app.fullName + '/\$/' + dbName));
+    expect(db.fullName, equals(app.fullName + '/' + dbName));
   });
 
   test('creating and destroying a database', () async {
@@ -32,5 +35,61 @@
     expect(await db.exists(), equals(false));
   });
 
+  test('basic watch', () async {
+    var app = c.app(utils.uniqueName('app'));
+    await app.create(utils.emptyPerms());
+    var db = app.noSqlDatabase(utils.uniqueName('db'));
+    await db.create(utils.emptyPerms());
+    var table = db.table(utils.uniqueName('table'));
+    await table.create(utils.emptyPerms());
+
+    // Perform some operations that we won't be watching.
+    await table.put('row1', UTF8.encode('value1'));
+    await table.delete('row1');
+
+    // Start watching everything from now.
+    var resumeMarker = await db.getResumeMarker();
+    var prefix = '';
+    var watchStream = db.watch(table.relativeName, prefix, resumeMarker);
+
+    // Perform some operations while are watching.
+    var expectedChanges = new List<WatchChange>();
+
+    await table.put('row2', UTF8.encode('value2'));
+    resumeMarker = await db.getResumeMarker();
+    var expectedChange = new WatchChange();
+    expectedChange.tableName = table.relativeName;
+    expectedChange.rowName = 'row2';
+    expectedChange.changeType = WatchChangeTypes.put;
+    expectedChange.valueBytes = UTF8.encode('value2');
+    expectedChange.resumeMarker = resumeMarker;
+    expectedChanges.add(expectedChange);
+
+    await table.delete('row2');
+    resumeMarker = await db.getResumeMarker();
+    expectedChange = new WatchChange();
+    expectedChange.tableName = table.relativeName;
+    expectedChange.rowName = 'row2';
+    expectedChange.changeType = WatchChangeTypes.delete;
+    expectedChange.valueBytes = new List<int>();
+    expectedChange.resumeMarker = resumeMarker;
+    expectedChanges.add(expectedChange);
+
+    // Ensure we see all the expected changes in order in the watch stream.
+    var changeNum = 0;
+    await for (var change in watchStream) {
+      // Classes generated by mojom Dart compiler do not override == and hashCode
+      // but they do override toString to print all properties. So we use toString
+      // to assert equality.
+      expect(change.toString(), equals(expectedChanges[changeNum].toString()));
+      changeNum++;
+      // We need to break out of awaiting for watch stream values when we get everything we expected.
+      // because watch stream does not end until canceled by design and we don't have canceling mechanism yet.
+      if (changeNum == expectedChanges.length) {
+        break;
+      }
+    }
+  });
+
   // TODO(nlacasse): Test database.get/setPermissions.
 }
diff --git a/dart/test/integration/syncbase_row_test.dart b/dart/test/integration/syncbase_row_test.dart
index 3b32141..7397f2c 100644
--- a/dart/test/integration/syncbase_row_test.dart
+++ b/dart/test/integration/syncbase_row_test.dart
@@ -22,7 +22,7 @@
     var row = table.row(rowName);
 
     expect(row.relativeName, equals(rowName));
-    expect(row.fullName, equals(table.fullName + '/\$/' + rowName));
+    expect(row.fullName, equals(table.fullName + '/' + rowName));
   });
 
   test('putting, getting and deleting row', () async {
diff --git a/dart/test/integration/syncbase_table_test.dart b/dart/test/integration/syncbase_table_test.dart
index de8b6d0..a590885 100644
--- a/dart/test/integration/syncbase_table_test.dart
+++ b/dart/test/integration/syncbase_table_test.dart
@@ -19,7 +19,7 @@
     var tableName = utils.uniqueName('table');
     var table = db.table(tableName);
     expect(table.relativeName, equals(tableName));
-    expect(table.fullName, equals(db.fullName + '/\$/' + tableName));
+    expect(table.fullName, equals(db.fullName + '/' + tableName));
   });
 
   test('creating and destroying a table', () async {
diff --git a/mojom/syncbase.mojom b/mojom/syncbase.mojom
index 8d4a739..0930c0a 100644
--- a/mojom/syncbase.mojom
+++ b/mojom/syncbase.mojom
@@ -48,7 +48,7 @@
 
 interface ExecStream {
   OnResult(Result result);
-  OnDone(Error err);
+  OnReturn(Error err);
 };
 
 struct KeyValue {
@@ -58,7 +58,27 @@
 
 interface ScanStream {
   OnKeyValue(KeyValue key_value);
-  OnDone(Error err);
+  OnReturn(Error err);
+};
+
+struct WatchChange {
+  string table_name;
+  string row_name;
+  uint32 change_type;
+  array<uint8> value_bytes;
+  array<uint8> resume_marker;
+  bool from_sync;
+  bool continued;
+};
+
+struct GlobRequest {
+  string pattern;
+  array<uint8> resume_marker;
+};
+
+interface WatchGlobStream {
+  OnChange(WatchChange change);
+  OnReturn(Error err);
 };
 
 // TODO(sadovsky): Add schema version to all RPCs. See v.io/c/13734.
@@ -96,6 +116,8 @@
   DbAbort(string name) => (Error err);
   DbGetPermissions(string name) => (Error err, Perms perms, string version);
   DbSetPermissions(string name, Perms perms, string version) => (Error err);
+  DbWatchGlob(string name, GlobRequest req, WatchGlobStream stream) => (Error err);
+  DbGetResumeMarker(string name) => (Error err, array<uint8> resumeMarker);
 
   // TODO(sadovsky): Add DatabaseWatcher, BlobManager, and SchemaManager
   // methods.
@@ -128,15 +150,17 @@
   TableCreate(string name, Perms perms) => (Error err);
   TableDestroy(string name) => (Error err);
   TableExists(string name) => (Error err, bool exists);
+  TableGetPermissions(string name) => (Error err, Perms perms);
+  TableSetPermissions(string name, Perms perms) => (Error err);
   TableDeleteRange(string name, array<uint8> start, array<uint8> limit)
       => (Error err);
   TableScan(
       string name, array<uint8> start, array<uint8> limit, ScanStream stream)
       => (Error err);
-  TableGetPermissions(string name, string key)
+  TableGetPrefixPermissions(string name, string key)
       => (Error err, array<PrefixPerms> perms_arr);
-  TableSetPermissions(string name, string prefix, Perms perms) => (Error err);
-  TableDeletePermissions(string name, string prefix) => (Error err);
+  TableSetPrefixPermissions(string name, string prefix, Perms perms) => (Error err);
+  TableDeletePrefixPermissions(string name, string prefix) => (Error err);
 
   ////////////////////////////////////////
   // nosql.Row