syncslides: Using the new discovery API.

MultiPart: 2/2
Change-Id: I49f80544db928063f8f39917e29e960c26d489a0
diff --git a/dart/lib/discovery/client.dart b/dart/lib/discovery/client.dart
index 1274f86..b5c8ecd 100644
--- a/dart/lib/discovery/client.dart
+++ b/dart/lib/discovery/client.dart
@@ -6,199 +6,119 @@
 
 import 'package:flutter/services.dart' show shell;
 import 'package:logging/logging.dart';
-import 'package:v23discovery/discovery.dart' as v23discovery;
+import 'package:v23discovery/discovery.dart' as discovery;
 
 import '../models/all.dart' as model;
 
 final Logger log = new Logger('discovery/client');
 
-const String v23DiscoveryMojoUrl =
+const String _discoveryMojoUrl =
     'https://discovery.syncslides.mojo.v.io/discovery.mojo';
 
 // TODO(aghassemi): We should make this the same between Flutter and Java apps when
 // they can actually talk to each other.
-const String presentationInterfaceName =
+const String _presentationInterfaceName =
     'v.io/release/projects/syncslides/dart/presentation';
 
-StreamController<model.PresentationAdvertisement> _onFoundEmitter =
-    new StreamController.broadcast();
-StreamController<String> _onLostEmitter = new StreamController.broadcast();
+final discovery.Client _discoveryClient =
+    new discovery.Client(shell.connectToService, _discoveryMojoUrl);
 
-Stream onFound = _onFoundEmitter.stream;
-Stream onLost = _onLostEmitter.stream;
-
-// TODO(aghassemi): v23discovery could really use a Dart client library.
-// Keep proxy, handle pairs so we can cancel calls later.
-ProxyResponseFuturePair<v23discovery.ScannerProxy,
-    v23discovery.ScannerScanResponseParams> _scanCall;
-
-Map<
-        String,
-        ProxyResponseFuturePair<v23discovery.AdvertiserProxy,
-            v23discovery.AdvertiserAdvertiseResponseParams>> _advertiseCalls =
-    new Map();
+final Map<String, discovery.Advertiser> _advertisers = new Map();
+discovery.Scanner _scanner = null;
 
 Future advertise(model.PresentationAdvertisement presentation) async {
-  log.info('Started advertising ${presentation.deck.name}.');
-  if (_advertiseCalls.containsKey(presentation.key)) {
-    // We are already advertising for this presentation.
-    return _advertiseCalls[presentation.key].responseFuture;
+  if (_advertisers.containsKey(presentation.key)) {
+    return _advertisers[presentation.key];
   }
 
+  log.info('Started advertising ${presentation.deck.name}.');
+
   Map<String, String> serviceAttrs = new Map();
   serviceAttrs['deckid'] = presentation.deck.key;
   serviceAttrs['name'] = presentation.deck.name;
   serviceAttrs['thumbnailkey'] = presentation.deck.thumbnail.key;
   serviceAttrs['presentationid'] = presentation.key;
-  v23discovery.Service serviceInfo = new v23discovery.Service()
-    ..interfaceName = presentationInterfaceName
+  discovery.Service service = new discovery.Service()
+    ..interfaceName = _presentationInterfaceName
     ..instanceName = presentation.key
     ..attrs = serviceAttrs
     ..addrs = [presentation.syncgroupName, presentation.thumbnailSyncgroupName];
 
-  v23discovery.AdvertiserProxy advertiser =
-      new v23discovery.AdvertiserProxy.unbound();
-  shell.connectToService(v23DiscoveryMojoUrl, advertiser);
-  Future advertiseResponseFuture = advertiser.ptr.advertise(serviceInfo, null);
-  _advertiseCalls[presentation.key] =
-      new ProxyResponseFuturePair(advertiser, advertiseResponseFuture);
-
-  v23discovery.AdvertiserAdvertiseResponseParams result =
-      await advertiseResponseFuture;
-  if (result.err != null) {
-    throw result.err;
-  }
+  _advertisers[presentation.key] = await _discoveryClient.advertise(service);
 
   log.info('Advertised ${presentation.deck.name} under ${presentation.key}.');
 }
 
-// Tracks advertisements that are in the middle of being stopped.
-Map<String, Future> _stoppingAdvertisingCalls = new Map<String, Future>();
 Future stopAdvertising(String presentationId) async {
-  if (!_advertiseCalls.containsKey(presentationId)) {
+  if (!_advertisers.containsKey(presentationId)) {
     // Not advertised, nothing to stop.
-    return new Future.value();
+    return;
   }
-
-  if (_stoppingAdvertisingCalls.containsKey(presentationId)) {
-    // Already stopping, return the exiting call future.
-    return _stoppingAdvertisingCalls[presentationId];
-  }
-
-  stop() async {
-    v23discovery.AdvertiserAdvertiseResponseParams advertiserResponse =
-        await _advertiseCalls[presentationId].responseFuture;
-
-    await _advertiseCalls[presentationId]
-        .proxy
-        .ptr
-        .stop(advertiserResponse.handle);
-    await _advertiseCalls[presentationId].proxy.close();
-  }
-
-  Future stoppingCall = stop();
-  _stoppingAdvertisingCalls[presentationId] = stoppingCall;
-
-  stoppingCall.then((_) {
-    _advertiseCalls.remove(presentationId);
-    log.info('Stopped advertising ${presentationId}.');
-  }).catchError((e) {
-    _stoppingAdvertisingCalls.remove(presentationId);
-    throw e;
-  });
+  await _advertisers[presentationId].stop();
+  _advertisers.remove(presentationId);
 }
 
-Future startScan() async {
-  if (_scanCall != null) {
-    // We are already scanning.
-    return _scanCall.responseFuture;
+// TODO(aghassemi): Remove use once
+// https://github.com/vanadium/issues/issues/1071 is resolved
+// Currently we need to keep this mapping since discovery's lost event only
+// contains an auto generated instanceId which we need to map back to presentationId.
+Map<String, String> instanceIdToPresentationIdMap = new Map();
+
+// Transforms a stream of discovery services to PresentationAdvertisement model objects.
+StreamTransformer toPresentation = new StreamTransformer.fromHandlers(
+    handleData:
+        (discovery.Service s, EventSink<model.PresentationAdvertisement> sink) {
+  String key = s.attrs['presentationid'];
+  instanceIdToPresentationIdMap[s.instanceId] = key;
+  log.info('Found presentation ${s.attrs['name']} under $key.');
+  // Ignore our own advertised services.
+  if (_advertisers.containsKey(key)) {
+    log.info('Presentation ${s.attrs['name']} was advertised by us; ignoring.');
+    return;
   }
 
-  var scanner = new v23discovery.ScannerProxy.unbound();
-  shell.connectToService(v23DiscoveryMojoUrl, scanner);
-  v23discovery.ScanHandlerStub handlerStub =
-      new v23discovery.ScanHandlerStub.unbound();
-  handlerStub.impl = new ScanHandler();
+  model.Deck deck = new model.Deck(s.attrs['deckid'], s.attrs['name'],
+      new model.BlobRef(s.attrs['thumbnailkey']));
+  var syncgroupName = s.addrs[0];
+  var thumbnailSyncgroupName = s.addrs[1];
+  model.PresentationAdvertisement presentation =
+      new model.PresentationAdvertisement(
+          key, deck, syncgroupName, thumbnailSyncgroupName);
 
-  var query = 'v.InterfaceName = "$presentationInterfaceName"';
-  var scannerResponseFuture = scanner.ptr.scan(query, handlerStub);
-  _scanCall = new ProxyResponseFuturePair(scanner, scannerResponseFuture);
+  sink.add(presentation);
+});
 
-  v23discovery.ScannerScanResponseParams result = await scannerResponseFuture;
-  if (result.err != null) {
-    throw result.err;
+// Transforms a stream of instanceIds to presentationIds.
+StreamTransformer toPresentationId = new StreamTransformer.fromHandlers(
+    handleData: (String instanceId, EventSink<String> sink) {
+  String presentationId = instanceIdToPresentationIdMap[instanceId];
+  sink.add(presentationId);
+});
+
+Future<PresentationScanner> scan() async {
+  if (_scanner != null) {
+    return _scanner;
   }
+  var query = 'v.InterfaceName = "$_presentationInterfaceName"';
+  _scanner = await _discoveryClient.scan(query);
+
   log.info('Scan started.');
+  return new PresentationScanner._internal(
+      _scanner.onFound.transform(toPresentation),
+      _scanner.onLost.transform(toPresentationId));
 }
 
-// Tracks whether we are already in the middle of stopping scan.
-Future _stoppingScanCall;
 Future stopScan() async {
-  if (_scanCall == null) {
-    // No scan call has been made before or scan is already being stopped.
-    return new Future.value();
+  if (_scanner == null) {
+    // No scan call has been made before.
+    return;
   }
-
-  if (_stoppingScanCall != null) {
-    // Already stopping, return the exiting call future.
-    return _stoppingScanCall;
-  }
-
-  stop() async {
-    v23discovery.ScannerScanResponseParams scannerResponse =
-        await _scanCall.responseFuture;
-
-    await _scanCall.proxy.ptr.stop(scannerResponse.handle);
-    await _scanCall.proxy.close();
-  }
-
-  _stoppingScanCall = stop();
-
-  _stoppingScanCall.then((_) {
-    _scanCall = null;
-    log.info('Scan stopped.');
-  }).catchError((e) {
-    _stoppingScanCall = null;
-    throw e;
-  });
+  await _scanner.stop();
+  _scanner = null;
 }
 
-class ScanHandler extends v23discovery.ScanHandler {
-  Map<String, String> instanceIdToPresentationIdMap = new Map();
-  found(v23discovery.Service s) async {
-    String key = s.attrs['presentationid'];
-    instanceIdToPresentationIdMap[s.instanceId] = key;
-    log.info('Found presentation ${s.attrs['name']} under $key.');
-    // Ignore our own advertised services.
-    if (_advertiseCalls.containsKey(key)) {
-      log.info(
-          'Presentation ${s.attrs['name']} was advertised by us; ignoring.');
-      return;
-    }
-
-    model.Deck deck = new model.Deck(s.attrs['deckid'], s.attrs['name'],
-        new model.BlobRef(s.attrs['thumbnailkey']));
-    var syncgroupName = s.addrs[0];
-    var thumbnailSyncgroupName = s.addrs[1];
-    model.PresentationAdvertisement presentation =
-        new model.PresentationAdvertisement(
-            key, deck, syncgroupName, thumbnailSyncgroupName);
-
-    _onFoundEmitter.add(presentation);
-  }
-
-  lost(String instanceId) {
-    String presentationId = instanceIdToPresentationIdMap[instanceId];
-    if (presentationId == null) {
-      return;
-    }
-    log.info('Lost presentation $presentationId.');
-    _onLostEmitter.add(presentationId);
-  }
-}
-
-class ProxyResponseFuturePair<T1, T2> {
-  final T1 proxy;
-  final Future<T2> responseFuture;
-  ProxyResponseFuturePair(this.proxy, this.responseFuture);
+class PresentationScanner {
+  Stream<model.PresentationAdvertisement> onFound;
+  Stream<String> onLost;
+  PresentationScanner._internal(this.onFound, this.onLost);
 }
diff --git a/dart/lib/stores/syncbase/store.dart b/dart/lib/stores/syncbase/store.dart
index 6d08108..567bac5 100644
--- a/dart/lib/stores/syncbase/store.dart
+++ b/dart/lib/stores/syncbase/store.dart
@@ -76,7 +76,9 @@
   void _triggerStateChange() => _stateChangeEmitter.add(_state);
 
   Future _startScanningForPresentations() async {
-    discovery.onFound.listen((model.PresentationAdvertisement newP) {
+    discovery.PresentationScanner scanner = await discovery.scan();
+
+    scanner.onFound.listen((model.PresentationAdvertisement newP) {
       _state._presentationsAdvertisements[newP.key] = newP;
       _triggerStateChange();
 
@@ -87,7 +89,7 @@
       sb.joinSyncgroup(sgName);
     });
 
-    discovery.onLost.listen((String presentationId) {
+    scanner.onLost.listen((String presentationId) {
       _state._presentationsAdvertisements.remove(presentationId);
       _state._decks.values.forEach((_DeckState deck) {
         if (deck.presentation != null &&
@@ -97,8 +99,6 @@
       });
       _triggerStateChange();
     });
-
-    discovery.startScan();
   }
 
   Future _getInitialValuesAndStartWatching(String table) async {
diff --git a/dart/pubspec.lock b/dart/pubspec.lock
index 1674050..347d63f 100644
--- a/dart/pubspec.lock
+++ b/dart/pubspec.lock
@@ -258,7 +258,7 @@
   v23discovery:
     description: v23discovery
     source: hosted
-    version: "0.0.9"
+    version: "0.0.10"
   vector_math:
     description: vector_math
     source: hosted
diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml
index f0cccde..9e9b207 100644
--- a/dart/pubspec.yaml
+++ b/dart/pubspec.yaml
@@ -6,7 +6,7 @@
   logging: ">=0.11.2 <0.12.0"
   mojo_services: ">=0.4.5 <0.5.0"
   syncbase: ">=0.0.27 <0.1.0"
-  v23discovery: ">=0.0.9 < 0.1.0"
+  v23discovery: ">=0.0.10 < 0.1.0"
   uuid: ">=0.5.0 <0.6.0"
 dev_dependencies:
   flutter_tools: