mojo/syncbase: Fixing handle leaks and forcing
strict mode for tests.

We were leaking mojo handles for proxies that
are created for Watch, Exec and Scan streams.
These proxies are created so Go code can call
into Dart and the handles are not tied to the
client proxy that is used to call Go code from
Dart and therefore they need to be tracked and
closed manually and neither mojo nor generated
mojom code knows about their existence.

This CL introduces a UnclosedStubsManager which
methods like Watch, Scan, Exec add their stubs
to and remove it when they finish. Any stubs that
are still open when SyncbaseClient.close() is called
are then closed preventing handle leaks.

This CL also refactors and bundles Proxy and
UnclosedStubsManager into a single ClientContext
that is passed from client to layers that need it.

NOTE: Currently we can not actually test for leaks
due to the bug https://github.com/domokit/mojo/issues/487
which is crashing the test runner when `app.close` is
called. Without `app.close()` being called before checking
for leaks, some handles will always leak. For now I have
verified leaks are down to 2 which are unrelated to our code
and are due to `app.close()` not being called but this
is not good, and hopefully that bug gets fixes soon so
we can enable the leak check.

Also note that `reportLeakedHandles` does not work in
non-strict mode so we were never actually testing for
leaks.

Change-Id: Ie00442bec5156fd1e8ecb8afb90a3479d66c7189
diff --git a/example/syncbase_example.dart b/example/syncbase_example.dart
index 7e8b419..bfaef13 100644
--- a/example/syncbase_example.dart
+++ b/example/syncbase_example.dart
@@ -7,7 +7,8 @@
 import 'dart:math' show Random;
 import 'dart:convert' show UTF8;
 
-import 'package:syncbase/initialized_application.dart' show InitializedApplication;
+import 'package:syncbase/initialized_application.dart'
+    show InitializedApplication;
 import 'package:syncbase/src/naming/util.dart' as naming;
 import 'package:syncbase/syncbase_client.dart' as sb;
 
@@ -121,7 +122,8 @@
 
 Future<sb.SyncbaseSyncgroup> joinOrCreateSyncgroup(sb.SyncbaseNoSqlDatabase db,
     String mtAddr, String tableName, String name) async {
-  var mtName = naming.join(mtAddr, 'users/vanadium.bot@gmail.com/apps/mojo-syncbase-example');
+  var mtName = naming.join(
+      mtAddr, 'users/vanadium.bot@gmail.com/apps/mojo-syncbase-example');
   // TODO(nlacasse): Make this %%sync thing a constant.
   var sgPrefix = naming.join(mtName, 'syncbase_mojo/%%sync');
   var sgName = naming.join(sgPrefix, 'testsg');
@@ -156,7 +158,6 @@
       print('ERROR creating syncgroup');
       print(e);
     }
-
   }
 
   return sg;
diff --git a/lib/src/app.dart b/lib/src/app.dart
index 6e56c3b..6c27195 100644
--- a/lib/src/app.dart
+++ b/lib/src/app.dart
@@ -8,33 +8,33 @@
 class SyncbaseApp extends NamedResource {
   // NOTE(sadovsky): For the Mojo Syncbase service, we only store names from app
   // down - i.e. there is no service name.
-  SyncbaseApp._internal(_proxy, relativeName)
-      : super._internal(_proxy, null, relativeName, escape(relativeName));
+  SyncbaseApp._internal(_ctx, relativeName)
+      : super._internal(_ctx, null, relativeName, escape(relativeName));
 
   // noSqlDatabase returns a noSqlDatabase with the given relativeName.
   SyncbaseNoSqlDatabase noSqlDatabase(String relativeName) {
     return new SyncbaseNoSqlDatabase._internal(
-        _proxy, fullName, relativeName, '');
+        _ctx, fullName, relativeName, '');
   }
 
   Future create(mojom.Perms perms) async {
-    var v = await _proxy.ptr.appCreate(fullName, perms);
+    var v = await _ctx.syncbase.appCreate(fullName, perms);
     if (isError(v.err)) throw v.err;
   }
 
   Future destroy() async {
-    var v = await _proxy.ptr.appDestroy(fullName);
+    var v = await _ctx.syncbase.appDestroy(fullName);
     if (isError(v.err)) throw v.err;
   }
 
   Future<bool> exists() async {
-    var v = await _proxy.ptr.appExists(fullName);
+    var v = await _ctx.syncbase.appExists(fullName);
     if (isError(v.err)) throw v.err;
     return v.exists;
   }
 
   Future<mojom.Perms> getPermissions() async {
-    var v = await _proxy.ptr.appGetPermissions(fullName);
+    var v = await _ctx.syncbase.appGetPermissions(fullName);
     if (isError(v.err)) throw v.err;
     // TODO(nlacasse): We need to return the version too.  Create a struct type
     // that combines perms and version?
@@ -42,7 +42,7 @@
   }
 
   Future setPermissions(mojom.Perms perms, String version) async {
-    var v = await _proxy.ptr.appSetPermissions(fullName, perms, version);
+    var v = await _ctx.syncbase.appSetPermissions(fullName, perms, version);
     if (isError(v.err)) throw v.err;
   }
 }
diff --git a/lib/src/client_context.dart b/lib/src/client_context.dart
new file mode 100644
index 0000000..571a9d3
--- /dev/null
+++ b/lib/src/client_context.dart
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+part of syncbase_client;
+
+// ClientContext holds references to objects that other layers need from the
+// SyncbaseClient, e.g. the Mojo proxy and stub manager.
+class ClientContext {
+  // Handle to the Syncbase Mojo proxy.
+  mojom.SyncbaseProxy proxy;
+
+  // Just a convenience getter for the Mojo proxy's Syncbase pointer.
+  mojom.Syncbase get syncbase => proxy.ptr;
+
+  // Handle to the unclosed stubs manager. Used by layers that need to track
+  // Mojo stubs that need to be closed when the Syncbase client is closed.
+  UnclosedStubsManager unclosedStubsManager;
+
+  ClientContext._internal(this.proxy, this.unclosedStubsManager);
+}
diff --git a/lib/src/named_resource.dart b/lib/src/named_resource.dart
index 74175c8..ad4f064 100644
--- a/lib/src/named_resource.dart
+++ b/lib/src/named_resource.dart
@@ -6,14 +6,14 @@
 
 // NamedResource is the superclass of resources with names.
 class NamedResource {
-  final mojom.SyncbaseProxy _proxy;
+  final ClientContext _ctx;
   final String _parentFullName;
   final String name;
   final String fullName;
 
-  NamedResource._internal(mojom.SyncbaseProxy _proxy, String _parentFullName,
-      String name, String fullName)
-      : this._proxy = _proxy,
+  NamedResource._internal(
+      ClientContext _ctx, String _parentFullName, String name, String fullName)
+      : this._ctx = _ctx,
         this._parentFullName = _parentFullName,
         this.name = name,
         this.fullName = fullName {}
diff --git a/lib/src/nosql/database.dart b/lib/src/nosql/database.dart
index d4c4938..2577feb 100644
--- a/lib/src/nosql/database.dart
+++ b/lib/src/nosql/database.dart
@@ -7,38 +7,38 @@
 // TODO(sadovsky): Add listTables method.
 class SyncbaseNoSqlDatabase extends NamedResource {
   SyncbaseNoSqlDatabase._internal(
-      _proxy, _parentFullName, relativeName, batchSuffix)
-      : super._internal(_proxy, _parentFullName, relativeName,
+      _ctx, _parentFullName, relativeName, batchSuffix)
+      : super._internal(_ctx, _parentFullName, relativeName,
             naming.join(_parentFullName, escape(relativeName) + batchSuffix));
 
   // table returns a table with the given relativeName.
   SyncbaseTable table(String relativeName) {
-    return new SyncbaseTable._internal(_proxy, fullName, relativeName);
+    return new SyncbaseTable._internal(_ctx, fullName, relativeName);
   }
 
   // syncgroup returns a syncgroup with the given name.
   SyncbaseSyncgroup syncgroup(String name) {
-    return new SyncbaseSyncgroup._internal(_proxy, this.fullName, name);
+    return new SyncbaseSyncgroup._internal(_ctx, this.fullName, name);
   }
 
   Future<List<String>> getSyncgroupNames() async {
-    var v = await _proxy.ptr.dbGetSyncgroupNames(fullName);
+    var v = await _ctx.syncbase.dbGetSyncgroupNames(fullName);
     if (isError(v.err)) throw v.err;
     return v.names;
   }
 
   Future create(mojom.Perms perms) async {
-    var v = await _proxy.ptr.dbCreate(fullName, perms);
+    var v = await _ctx.syncbase.dbCreate(fullName, perms);
     if (isError(v.err)) throw v.err;
   }
 
   Future destroy() async {
-    var v = await _proxy.ptr.dbDestroy(fullName);
+    var v = await _ctx.syncbase.dbDestroy(fullName);
     if (isError(v.err)) throw v.err;
   }
 
   Future<bool> exists() async {
-    var v = await _proxy.ptr.dbExists(fullName);
+    var v = await _ctx.syncbase.dbExists(fullName);
     if (isError(v.err)) throw v.err;
     return v.exists;
   }
@@ -47,10 +47,12 @@
     StreamController<mojom.Result> sc = new StreamController();
 
     mojom.ExecStreamStub stub = new mojom.ExecStreamStub.unbound();
-    stub.impl = new ExecStreamImpl._fromStreamController(sc);
+    stub.impl = new ExecStreamImpl._fromStreamController(_ctx, sc, stub);
+
+    _ctx.unclosedStubsManager.register(stub);
 
     // Call dbExec asynchronously.
-    _proxy.ptr.dbExec(fullName, query, stub).then((v) {
+    _ctx.syncbase.dbExec(fullName, query, stub).then((v) {
       // TODO(nlacasse): Same question regarding throwing behavior as TableScan.
       if (isError(v.err)) throw v.err;
     });
@@ -63,7 +65,10 @@
     StreamController<mojom.WatchChange> sc = new StreamController();
 
     mojom.WatchGlobStreamStub stub = new mojom.WatchGlobStreamStub.unbound();
-    stub.impl = new WatchGlobStreamImpl._fromStreamController(sc);
+
+    stub.impl = new WatchGlobStreamImpl._fromStreamController(_ctx, sc, stub);
+
+    _ctx.unclosedStubsManager.register(stub);
 
     // TODO(aghassemi): Implement naming utilities such as Join in Dart and use them instead.
     var pattern = naming.join(tableName, prefix) + '*';
@@ -72,7 +77,7 @@
     req.resumeMarker = resumeMarker;
 
     // Call dbWatch asynchronously.
-    _proxy.ptr.dbWatchGlob(fullName, req, stub).then((c) {
+    _ctx.syncbase.dbWatchGlob(fullName, req, stub).then((c) {
       // TODO(nlacasse): Same question regarding throwing behavior as TableScan.
       if (isError(c.err)) throw c.err;
     });
@@ -81,36 +86,36 @@
   }
 
   Future<List<int>> getResumeMarker() async {
-    var v = await _proxy.ptr.dbGetResumeMarker(fullName);
+    var v = await _ctx.syncbase.dbGetResumeMarker(fullName);
     if (isError(v.err)) throw v.err;
     return v.resumeMarker;
   }
 
   Future<List<SyncbaseTable>> listTables() async {
-    var v = await _proxy.ptr.dbListTables(fullName);
+    var v = await _ctx.syncbase.dbListTables(fullName);
     if (isError(v.err)) throw v.err;
 
     return v.tables.map((tableName) => this.table(tableName)).toList();
   }
 
   Future<String> beginBatch(mojom.BatchOptions opts) async {
-    var v = await _proxy.ptr.dbBeginBatch(fullName, opts);
+    var v = await _ctx.syncbase.dbBeginBatch(fullName, opts);
     if (isError(v.err)) throw v.err;
     return v.batchSuffix;
   }
 
   Future commit() async {
-    var v = await _proxy.ptr.dbCommit(fullName);
+    var v = await _ctx.syncbase.dbCommit(fullName);
     if (isError(v.err)) throw v.err;
   }
 
   Future abort() async {
-    var v = await _proxy.ptr.dbAbort(fullName);
+    var v = await _ctx.syncbase.dbAbort(fullName);
     if (isError(v.err)) throw v.err;
   }
 
   Future<mojom.Perms> getPermissions() async {
-    var v = await _proxy.ptr.dbGetPermissions(fullName);
+    var v = await _ctx.syncbase.dbGetPermissions(fullName);
     if (isError(v.err)) throw v.err;
     // TODO(nlacasse): We need to return the version too.  Create a struct type
     // that combines perms and version?
@@ -118,7 +123,7 @@
   }
 
   Future setPermissions(mojom.Perms perms, String version) async {
-    var v = await _proxy.ptr.dbSetPermissions(fullName, perms, version);
+    var v = await _ctx.syncbase.dbSetPermissions(fullName, perms, version);
     if (isError(v.err)) throw v.err;
   }
 }
@@ -126,10 +131,12 @@
 class ExecStreamImpl extends Object
     with StreamFlowControl
     implements mojom.ExecStream {
-  final StreamController<mojom.Result> sc;
+  final ClientContext _ctx;
+  final StreamController<mojom.Result> _sc;
+  final mojom.ExecStreamStub _stub;
 
-  ExecStreamImpl._fromStreamController(this.sc) {
-    initFlowControl(this.sc);
+  ExecStreamImpl._fromStreamController(this._ctx, this._sc, this._stub) {
+    initFlowControl(this._sc);
   }
 
   onResult(mojom.Result result, [Function newAck = null]) {
@@ -138,7 +145,7 @@
     if (newAck == null) {
       throw new ArgumentError('newAck can not be null');
     }
-    sc.add(result);
+    _sc.add(result);
 
     // Only ack after we get unlocked.
     // If we are not locked, onNextUnlock returns immediately.
@@ -148,19 +155,22 @@
   // Called by the mojo proxy when the Go function call returns.
   onDone(mojom.Error err) {
     if (isError(err)) {
-      sc.addError(err);
+      _sc.addError(err);
     }
-    sc.close();
+    _sc.close();
+    _ctx.unclosedStubsManager.close(_stub);
   }
 }
 
 class WatchGlobStreamImpl extends Object
     with StreamFlowControl
     implements mojom.WatchGlobStream {
-  final StreamController<mojom.WatchChange> sc;
+  final ClientContext _ctx;
+  final StreamController<mojom.WatchChange> _sc;
+  final mojom.WatchGlobStreamStub _stub;
 
-  WatchGlobStreamImpl._fromStreamController(this.sc) {
-    initFlowControl(this.sc);
+  WatchGlobStreamImpl._fromStreamController(this._ctx, this._sc, this._stub) {
+    initFlowControl(this._sc);
   }
 
   Future onChange(mojom.WatchChange change, [Function newAck = null]) {
@@ -174,7 +184,7 @@
       testing.DatabaseWatch.onChangeCounter.increment();
     }
 
-    sc.add(change);
+    _sc.add(change);
 
     // Only ack after we get unlocked.
     // If we are not locked, onNextUnlock returns immediately.
@@ -185,8 +195,9 @@
   // Watch technically never returns unless there is an error.
   onError(mojom.Error err) {
     if (isError(err)) {
-      sc.addError(err);
+      _sc.addError(err);
     }
-    sc.close();
+    _sc.close();
+    _ctx.unclosedStubsManager.close(_stub);
   }
 }
diff --git a/lib/src/nosql/row.dart b/lib/src/nosql/row.dart
index 293fe5a..f348081 100644
--- a/lib/src/nosql/row.dart
+++ b/lib/src/nosql/row.dart
@@ -7,30 +7,30 @@
 class SyncbaseRow extends NamedResource {
   final String key;
 
-  SyncbaseRow._internal(_proxy, _parentFullName, key)
-      : super._internal(_proxy, _parentFullName, key,
+  SyncbaseRow._internal(_ctx, _parentFullName, key)
+      : super._internal(_ctx, _parentFullName, key,
             naming.join(_parentFullName, escape(key))),
         this.key = key;
 
   Future<bool> exists() async {
-    var v = await _proxy.ptr.rowExists(fullName);
+    var v = await _ctx.syncbase.rowExists(fullName);
     if (isError(v.err)) throw v.err;
     return v.exists;
   }
 
   Future<List<int>> get() async {
-    var v = await _proxy.ptr.rowGet(fullName);
+    var v = await _ctx.syncbase.rowGet(fullName);
     if (isError(v.err)) throw v.err;
     return v.value;
   }
 
   Future put(List<int> value) async {
-    var v = await _proxy.ptr.rowPut(fullName, value);
+    var v = await _ctx.syncbase.rowPut(fullName, value);
     if (isError(v.err)) throw v.err;
   }
 
   Future delete() async {
-    var v = await _proxy.ptr.rowDelete(fullName);
+    var v = await _ctx.syncbase.rowDelete(fullName);
     if (isError(v.err)) throw v.err;
   }
 }
diff --git a/lib/src/nosql/syncgroup.dart b/lib/src/nosql/syncgroup.dart
index 32f945d..9c8321c 100644
--- a/lib/src/nosql/syncgroup.dart
+++ b/lib/src/nosql/syncgroup.dart
@@ -7,38 +7,38 @@
 class SyncbaseSyncgroup {
   final String name;
   final String _dbName;
-  final mojom.SyncbaseProxy _proxy;
+  final ClientContext _ctx;
 
-  SyncbaseSyncgroup._internal(this._proxy, this._dbName, this.name);
+  SyncbaseSyncgroup._internal(this._ctx, this._dbName, this.name);
 
   Future create(
       mojom.SyncgroupSpec spec, mojom.SyncgroupMemberInfo myInfo) async {
-    var v = await _proxy.ptr.dbCreateSyncgroup(_dbName, name, spec, myInfo);
+    var v = await _ctx.syncbase.dbCreateSyncgroup(_dbName, name, spec, myInfo);
     if (isError(v.err)) throw v.err;
   }
 
   Future join(mojom.SyncgroupMemberInfo myInfo) async {
-    var v = await _proxy.ptr.dbJoinSyncgroup(_dbName, name, myInfo);
+    var v = await _ctx.syncbase.dbJoinSyncgroup(_dbName, name, myInfo);
     if (isError(v.err)) throw v.err;
   }
 
   Future leave() async {
-    var v = await _proxy.ptr.dbLeaveSyncgroup(_dbName, name);
+    var v = await _ctx.syncbase.dbLeaveSyncgroup(_dbName, name);
     if (isError(v.err)) throw v.err;
   }
 
   Future destroy() async {
-    var v = await _proxy.ptr.dbDestroySyncgroup(_dbName, name);
+    var v = await _ctx.syncbase.dbDestroySyncgroup(_dbName, name);
     if (isError(v.err)) throw v.err;
   }
 
   Future eject(String memberName) async {
-    var v = await _proxy.ptr.dbEjectFromSyncgroup(_dbName, name, memberName);
+    var v = await _ctx.syncbase.dbEjectFromSyncgroup(_dbName, name, memberName);
     if (isError(v.err)) throw v.err;
   }
 
   Future<mojom.SyncgroupSpec> getSpec() async {
-    var v = await _proxy.ptr.dbGetSyncgroupSpec(_dbName, name);
+    var v = await _ctx.syncbase.dbGetSyncgroupSpec(_dbName, name);
     if (isError(v.err)) throw v.err;
     // TODO(nlacasse): We need to return the version too.  Create a struct type
     // that combines spec and version?
@@ -46,12 +46,13 @@
   }
 
   Future setSpec(mojom.SyncgroupSpec spec, String version) async {
-    var v = await _proxy.ptr.dbSetSyncgroupSpec(_dbName, name, spec, version);
+    var v =
+        await _ctx.syncbase.dbSetSyncgroupSpec(_dbName, name, spec, version);
     if (isError(v.err)) throw v.err;
   }
 
   Future<Map<String, mojom.SyncgroupMemberInfo>> getMembers() async {
-    var v = await _proxy.ptr.dbGetSyncgroupMembers(_dbName, name);
+    var v = await _ctx.syncbase.dbGetSyncgroupMembers(_dbName, name);
     if (isError(v.err)) throw v.err;
     return v.infos;
   }
diff --git a/lib/src/nosql/table.dart b/lib/src/nosql/table.dart
index 67d4d61..95fe265 100644
--- a/lib/src/nosql/table.dart
+++ b/lib/src/nosql/table.dart
@@ -5,39 +5,39 @@
 part of syncbase_client;
 
 class SyncbaseTable extends NamedResource {
-  SyncbaseTable._internal(_proxy, _parentFullName, relativeName)
-      : super._internal(_proxy, _parentFullName, relativeName,
+  SyncbaseTable._internal(_ctx, _parentFullName, relativeName)
+      : super._internal(_ctx, _parentFullName, relativeName,
             naming.join(_parentFullName, escape(relativeName)));
 
   // row returns a row with the given key.
   SyncbaseRow row(String key) {
-    return new SyncbaseRow._internal(_proxy, fullName, key);
+    return new SyncbaseRow._internal(_ctx, fullName, key);
   }
 
   Future create(mojom.Perms perms) async {
-    var v = await _proxy.ptr.tableCreate(fullName, perms);
+    var v = await _ctx.syncbase.tableCreate(fullName, perms);
     if (isError(v.err)) throw v.err;
   }
 
   Future destroy() async {
-    var v = await _proxy.ptr.tableDestroy(fullName);
+    var v = await _ctx.syncbase.tableDestroy(fullName);
     if (isError(v.err)) throw v.err;
   }
 
   Future<bool> exists() async {
-    var v = await _proxy.ptr.tableExists(fullName);
+    var v = await _ctx.syncbase.tableExists(fullName);
     if (isError(v.err)) throw v.err;
     return v.exists;
   }
 
   Future<mojom.Perms> getPermissions() async {
-    var v = await _proxy.ptr.tableGetPermissions(fullName);
+    var v = await _ctx.syncbase.tableGetPermissions(fullName);
     if (isError(v.err)) throw v.err;
     return v.perms;
   }
 
   Future setPermissions(mojom.Perms perms) async {
-    var v = await _proxy.ptr.tableSetPermissions(fullName, perms);
+    var v = await _ctx.syncbase.tableSetPermissions(fullName, perms);
     if (isError(v.err)) throw v.err;
   }
 
@@ -54,8 +54,8 @@
   }
 
   Future deleteRange(RowRange range) async {
-    var v =
-        await _proxy.ptr.tableDeleteRange(fullName, range.start, range.limit);
+    var v = await _ctx.syncbase
+        .tableDeleteRange(fullName, range.start, range.limit);
     if (isError(v.err)) throw v.err;
   }
 
@@ -63,10 +63,12 @@
     StreamController<mojom.KeyValue> sc = new StreamController();
 
     mojom.ScanStreamStub stub = new mojom.ScanStreamStub.unbound();
-    stub.impl = new ScanStreamImpl._fromStreamController(sc);
+    stub.impl = new ScanStreamImpl._fromStreamController(_ctx, sc, stub);
+
+    _ctx.unclosedStubsManager.register(stub);
 
     // Call tableScan asynchronously.
-    _proxy.ptr.tableScan(fullName, range.start, range.limit, stub).then((v) {
+    _ctx.syncbase.tableScan(fullName, range.start, range.limit, stub).then((v) {
       // TODO(nlacasse): Is throwing the correct behavior here?  Consider
       // returning a tuple (Stream<mojom.KeyValue>, Future) and resolve the
       // Future at the end of the RPC (with an error if applicable).  Then
@@ -81,18 +83,19 @@
   }
 
   Future<List<mojom.PrefixPerms>> getPrefixPermissions(String key) async {
-    var v = await _proxy.ptr.tableGetPrefixPermissions(fullName, key);
+    var v = await _ctx.syncbase.tableGetPrefixPermissions(fullName, key);
     if (isError(v.err)) throw v.err;
     return v.permsArr;
   }
 
   Future setPrefixPermissions(String prefix, mojom.Perms perms) async {
-    var v = await _proxy.ptr.tableSetPrefixPermissions(fullName, prefix, perms);
+    var v =
+        await _ctx.syncbase.tableSetPrefixPermissions(fullName, prefix, perms);
     if (isError(v.err)) throw v.err;
   }
 
   Future deletePrefixPermissions(String prefix) async {
-    var v = await _proxy.ptr.tableDeletePrefixPermissions(fullName, prefix);
+    var v = await _ctx.syncbase.tableDeletePrefixPermissions(fullName, prefix);
     if (isError(v.err)) throw v.err;
   }
 }
@@ -100,10 +103,12 @@
 class ScanStreamImpl extends Object
     with StreamFlowControl
     implements mojom.ScanStream {
-  final StreamController<mojom.KeyValue> sc;
+  final ClientContext _ctx;
+  final StreamController<mojom.KeyValue> _sc;
+  final mojom.ScanStreamStub _stub;
 
-  ScanStreamImpl._fromStreamController(this.sc) {
-    initFlowControl(this.sc);
+  ScanStreamImpl._fromStreamController(this._ctx, this._sc, this._stub) {
+    initFlowControl(this._sc);
   }
 
   Future onKeyValue(mojom.KeyValue keyValue, [Function newAck = null]) {
@@ -113,7 +118,7 @@
       throw new ArgumentError('newAck can not be null');
     }
 
-    sc.add(keyValue);
+    _sc.add(keyValue);
 
     // Only ack after we get unlocked.
     // If we are not locked, onNextUnlock returns immediately.
@@ -123,8 +128,9 @@
   // Called by the mojom proxy when the Go function call returns.
   onDone(mojom.Error err) {
     if (isError(err)) {
-      sc.addError(err);
+      _sc.addError(err);
     }
-    sc.close();
+    _sc.close();
+    _ctx.unclosedStubsManager.close(_stub);
   }
 }
diff --git a/lib/src/unclosed_stubs_manager.dart b/lib/src/unclosed_stubs_manager.dart
new file mode 100644
index 0000000..b154b87
--- /dev/null
+++ b/lib/src/unclosed_stubs_manager.dart
@@ -0,0 +1,30 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+part of syncbase_client;
+
+// UnclosedStubsManager allows different layers to keep track of the Mojo stubs
+// that need to be closed when the Syncbase client is closed.
+class UnclosedStubsManager {
+  List _stubs = [];
+
+  UnclosedStubsManager._internal();
+
+  register(stub) {
+    _stubs.add(stub);
+  }
+
+  close(stub) {
+    bool exists = _stubs.remove(stub);
+    if (!exists) {
+      throw new ArgumentError.value(stub, 'stub',
+          'Does not exist. Please ensure it is registered before calling close.');
+    }
+    stub.close();
+  }
+
+  Future closeAll({immediate: false}) {
+    return Future.wait(_stubs.map((s) => s.close(immediate: immediate)));
+  }
+}
diff --git a/lib/syncbase_client.dart b/lib/syncbase_client.dart
index 646dc1a..e259b51 100644
--- a/lib/syncbase_client.dart
+++ b/lib/syncbase_client.dart
@@ -25,8 +25,10 @@
         WatchChange;
 
 part 'src/app.dart';
+part 'src/client_context.dart';
 part 'src/named_resource.dart';
 part 'src/stream_flow_control.dart';
+part 'src/unclosed_stubs_manager.dart';
 part 'src/util.dart';
 part 'src/nosql/database.dart';
 part 'src/nosql/row.dart';
@@ -43,26 +45,29 @@
 
 // TODO(sadovsky): Add listApps method.
 class SyncbaseClient {
-  final mojom.SyncbaseProxy _proxy;
+  final ClientContext _ctx;
 
   SyncbaseClient(ConnectToServiceFn cts, String url)
-      : _proxy = new mojom.SyncbaseProxy.unbound() {
+      : _ctx = new ClientContext._internal(new mojom.SyncbaseProxy.unbound(),
+            new UnclosedStubsManager._internal()) {
     print('connecting to $url');
-    cts(url, _proxy);
+    cts(url, _ctx.proxy);
     print('connected');
   }
 
   // Closes the connection to the syncbase server.
-  // TODO(nlacasse): Is this necessary?
   Future close({bool immediate: false}) {
-    return _proxy.close(immediate: immediate);
+    return Future.wait([
+      _ctx.unclosedStubsManager.closeAll(immediate: immediate),
+      _ctx.proxy.close(immediate: immediate)
+    ]);
   }
 
   // app returns the app with the given name.
-  SyncbaseApp app(String name) => new SyncbaseApp._internal(_proxy, name);
+  SyncbaseApp app(String name) => new SyncbaseApp._internal(_ctx, name);
 
   Future<mojom.Perms> getPermissions() async {
-    var v = await _proxy.ptr.serviceGetPermissions();
+    var v = await _ctx.syncbase.serviceGetPermissions();
     if (isError(v.err)) throw v.err;
     // TODO(nlacasse): We need to return the version too.  Create a struct type
     // that combines perms and version?
@@ -70,7 +75,7 @@
   }
 
   Future setPermissions(mojom.Perms perms, String version) async {
-    var v = await _proxy.ptr.serviceSetPermissions(perms, version);
+    var v = await _ctx.syncbase.serviceSetPermissions(perms, version);
     if (isError(v.err)) throw v.err;
   }
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 000a0a6..1089aa1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -2,7 +2,7 @@
 description: Syncbase is a storage system for developers that makes it easy to synchronize app data between devices. It works even when devices are not connected to the Internet.
 homepage: https://github.com/vanadium/mojo.syncbase
 name: syncbase
-version: 0.0.6
+version: 0.0.8
 dependencies:
   mojo: '>=0.3.0 <0.4.0'
 dev_dependencies:
diff --git a/test/integration/syncbase_test.dart b/test/integration/syncbase_test.dart
index ab2a57e..1f1ddf0 100755
--- a/test/integration/syncbase_test.dart
+++ b/test/integration/syncbase_test.dart
@@ -1,4 +1,4 @@
-#!mojo mojo:dart_content_handler
+#!mojo mojo:dart_content_handler?strict=true
 // Copyright 2015 The Vanadium Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
@@ -9,7 +9,8 @@
 import 'package:test/test.dart';
 
 import 'package:syncbase/src/testing_instrumentation.dart' as testing;
-import 'package:syncbase/initialized_application.dart' show InitializedApplication;
+import 'package:syncbase/initialized_application.dart'
+    show InitializedApplication;
 import 'package:syncbase/syncbase_client.dart' show SyncbaseClient;
 
 // Import other test files.
@@ -45,14 +46,18 @@
   // callback.  See https://github.com/dart-lang/test/issues/18.
   test('terminate shell connection', () async {
     await c.close();
-    expect(MojoHandle.reportLeakedHandles(), isTrue);
 
-    // TODO(nlacasse): When running mojo_shell with --enable-multiprocess,
-    // closing the application causes a non-graceful shutdown.  To avoid this,
+    // TODO(aghassemi): Reporting on handle leaks is blocked on
+    // a bug (https://github.com/domokit/mojo/issues/487) where mojo_test is not
+    // exiting gracefully with --enable-multiprocess. To avoid this,
     // we sleep for a second so the test reporter can finish and print the
-    // results before we close the app.  Once mojo_shell can shut down more
-    // gracefully, we should call app.close directly in the test and not in
-    // this Timer.
+    // results before we close the app.
+    // Currently there is no way to check for leaks since if we don't wait for
+    // app.close(), there will always be leaks and if we do wait, test crashes
+    // due to the bug and we can't check leaks anyway.
+    // Enable this and remote the timer as soon as that bug is fixed.
+    //await app.close();
+    //expect(MojoHandle.reportLeakedHandles(), isTrue);
     new Timer(new Duration(seconds: 1), app.close);
   });
 }