syncslides: Using the new discovery API.
MultiPart: 2/2
Change-Id: I49f80544db928063f8f39917e29e960c26d489a0
diff --git a/dart/lib/discovery/client.dart b/dart/lib/discovery/client.dart
index 1274f86..b5c8ecd 100644
--- a/dart/lib/discovery/client.dart
+++ b/dart/lib/discovery/client.dart
@@ -6,199 +6,119 @@
import 'package:flutter/services.dart' show shell;
import 'package:logging/logging.dart';
-import 'package:v23discovery/discovery.dart' as v23discovery;
+import 'package:v23discovery/discovery.dart' as discovery;
import '../models/all.dart' as model;
final Logger log = new Logger('discovery/client');
-const String v23DiscoveryMojoUrl =
+const String _discoveryMojoUrl =
'https://discovery.syncslides.mojo.v.io/discovery.mojo';
// TODO(aghassemi): We should make this the same between Flutter and Java apps when
// they can actually talk to each other.
-const String presentationInterfaceName =
+const String _presentationInterfaceName =
'v.io/release/projects/syncslides/dart/presentation';
-StreamController<model.PresentationAdvertisement> _onFoundEmitter =
- new StreamController.broadcast();
-StreamController<String> _onLostEmitter = new StreamController.broadcast();
+final discovery.Client _discoveryClient =
+ new discovery.Client(shell.connectToService, _discoveryMojoUrl);
-Stream onFound = _onFoundEmitter.stream;
-Stream onLost = _onLostEmitter.stream;
-
-// TODO(aghassemi): v23discovery could really use a Dart client library.
-// Keep proxy, handle pairs so we can cancel calls later.
-ProxyResponseFuturePair<v23discovery.ScannerProxy,
- v23discovery.ScannerScanResponseParams> _scanCall;
-
-Map<
- String,
- ProxyResponseFuturePair<v23discovery.AdvertiserProxy,
- v23discovery.AdvertiserAdvertiseResponseParams>> _advertiseCalls =
- new Map();
+final Map<String, discovery.Advertiser> _advertisers = new Map();
+discovery.Scanner _scanner = null;
Future advertise(model.PresentationAdvertisement presentation) async {
- log.info('Started advertising ${presentation.deck.name}.');
- if (_advertiseCalls.containsKey(presentation.key)) {
- // We are already advertising for this presentation.
- return _advertiseCalls[presentation.key].responseFuture;
+ if (_advertisers.containsKey(presentation.key)) {
+ return _advertisers[presentation.key];
}
+ log.info('Started advertising ${presentation.deck.name}.');
+
Map<String, String> serviceAttrs = new Map();
serviceAttrs['deckid'] = presentation.deck.key;
serviceAttrs['name'] = presentation.deck.name;
serviceAttrs['thumbnailkey'] = presentation.deck.thumbnail.key;
serviceAttrs['presentationid'] = presentation.key;
- v23discovery.Service serviceInfo = new v23discovery.Service()
- ..interfaceName = presentationInterfaceName
+ discovery.Service service = new discovery.Service()
+ ..interfaceName = _presentationInterfaceName
..instanceName = presentation.key
..attrs = serviceAttrs
..addrs = [presentation.syncgroupName, presentation.thumbnailSyncgroupName];
- v23discovery.AdvertiserProxy advertiser =
- new v23discovery.AdvertiserProxy.unbound();
- shell.connectToService(v23DiscoveryMojoUrl, advertiser);
- Future advertiseResponseFuture = advertiser.ptr.advertise(serviceInfo, null);
- _advertiseCalls[presentation.key] =
- new ProxyResponseFuturePair(advertiser, advertiseResponseFuture);
-
- v23discovery.AdvertiserAdvertiseResponseParams result =
- await advertiseResponseFuture;
- if (result.err != null) {
- throw result.err;
- }
+ _advertisers[presentation.key] = await _discoveryClient.advertise(service);
log.info('Advertised ${presentation.deck.name} under ${presentation.key}.');
}
-// Tracks advertisements that are in the middle of being stopped.
-Map<String, Future> _stoppingAdvertisingCalls = new Map<String, Future>();
Future stopAdvertising(String presentationId) async {
- if (!_advertiseCalls.containsKey(presentationId)) {
+ if (!_advertisers.containsKey(presentationId)) {
// Not advertised, nothing to stop.
- return new Future.value();
+ return;
}
-
- if (_stoppingAdvertisingCalls.containsKey(presentationId)) {
- // Already stopping, return the exiting call future.
- return _stoppingAdvertisingCalls[presentationId];
- }
-
- stop() async {
- v23discovery.AdvertiserAdvertiseResponseParams advertiserResponse =
- await _advertiseCalls[presentationId].responseFuture;
-
- await _advertiseCalls[presentationId]
- .proxy
- .ptr
- .stop(advertiserResponse.handle);
- await _advertiseCalls[presentationId].proxy.close();
- }
-
- Future stoppingCall = stop();
- _stoppingAdvertisingCalls[presentationId] = stoppingCall;
-
- stoppingCall.then((_) {
- _advertiseCalls.remove(presentationId);
- log.info('Stopped advertising ${presentationId}.');
- }).catchError((e) {
- _stoppingAdvertisingCalls.remove(presentationId);
- throw e;
- });
+ await _advertisers[presentationId].stop();
+ _advertisers.remove(presentationId);
}
-Future startScan() async {
- if (_scanCall != null) {
- // We are already scanning.
- return _scanCall.responseFuture;
+// TODO(aghassemi): Remove use once
+// https://github.com/vanadium/issues/issues/1071 is resolved
+// Currently we need to keep this mapping since discovery's lost event only
+// contains an auto generated instanceId which we need to map back to presentationId.
+Map<String, String> instanceIdToPresentationIdMap = new Map();
+
+// Transforms a stream of discovery services to PresentationAdvertisement model objects.
+StreamTransformer toPresentation = new StreamTransformer.fromHandlers(
+ handleData:
+ (discovery.Service s, EventSink<model.PresentationAdvertisement> sink) {
+ String key = s.attrs['presentationid'];
+ instanceIdToPresentationIdMap[s.instanceId] = key;
+ log.info('Found presentation ${s.attrs['name']} under $key.');
+ // Ignore our own advertised services.
+ if (_advertisers.containsKey(key)) {
+ log.info('Presentation ${s.attrs['name']} was advertised by us; ignoring.');
+ return;
}
- var scanner = new v23discovery.ScannerProxy.unbound();
- shell.connectToService(v23DiscoveryMojoUrl, scanner);
- v23discovery.ScanHandlerStub handlerStub =
- new v23discovery.ScanHandlerStub.unbound();
- handlerStub.impl = new ScanHandler();
+ model.Deck deck = new model.Deck(s.attrs['deckid'], s.attrs['name'],
+ new model.BlobRef(s.attrs['thumbnailkey']));
+ var syncgroupName = s.addrs[0];
+ var thumbnailSyncgroupName = s.addrs[1];
+ model.PresentationAdvertisement presentation =
+ new model.PresentationAdvertisement(
+ key, deck, syncgroupName, thumbnailSyncgroupName);
- var query = 'v.InterfaceName = "$presentationInterfaceName"';
- var scannerResponseFuture = scanner.ptr.scan(query, handlerStub);
- _scanCall = new ProxyResponseFuturePair(scanner, scannerResponseFuture);
+ sink.add(presentation);
+});
- v23discovery.ScannerScanResponseParams result = await scannerResponseFuture;
- if (result.err != null) {
- throw result.err;
+// Transforms a stream of instanceIds to presentationIds.
+StreamTransformer toPresentationId = new StreamTransformer.fromHandlers(
+ handleData: (String instanceId, EventSink<String> sink) {
+ String presentationId = instanceIdToPresentationIdMap[instanceId];
+ sink.add(presentationId);
+});
+
+Future<PresentationScanner> scan() async {
+ if (_scanner != null) {
+ return _scanner;
}
+ var query = 'v.InterfaceName = "$_presentationInterfaceName"';
+ _scanner = await _discoveryClient.scan(query);
+
log.info('Scan started.');
+ return new PresentationScanner._internal(
+ _scanner.onFound.transform(toPresentation),
+ _scanner.onLost.transform(toPresentationId));
}
-// Tracks whether we are already in the middle of stopping scan.
-Future _stoppingScanCall;
Future stopScan() async {
- if (_scanCall == null) {
- // No scan call has been made before or scan is already being stopped.
- return new Future.value();
+ if (_scanner == null) {
+ // No scan call has been made before.
+ return;
}
-
- if (_stoppingScanCall != null) {
- // Already stopping, return the exiting call future.
- return _stoppingScanCall;
- }
-
- stop() async {
- v23discovery.ScannerScanResponseParams scannerResponse =
- await _scanCall.responseFuture;
-
- await _scanCall.proxy.ptr.stop(scannerResponse.handle);
- await _scanCall.proxy.close();
- }
-
- _stoppingScanCall = stop();
-
- _stoppingScanCall.then((_) {
- _scanCall = null;
- log.info('Scan stopped.');
- }).catchError((e) {
- _stoppingScanCall = null;
- throw e;
- });
+ await _scanner.stop();
+ _scanner = null;
}
-class ScanHandler extends v23discovery.ScanHandler {
- Map<String, String> instanceIdToPresentationIdMap = new Map();
- found(v23discovery.Service s) async {
- String key = s.attrs['presentationid'];
- instanceIdToPresentationIdMap[s.instanceId] = key;
- log.info('Found presentation ${s.attrs['name']} under $key.');
- // Ignore our own advertised services.
- if (_advertiseCalls.containsKey(key)) {
- log.info(
- 'Presentation ${s.attrs['name']} was advertised by us; ignoring.');
- return;
- }
-
- model.Deck deck = new model.Deck(s.attrs['deckid'], s.attrs['name'],
- new model.BlobRef(s.attrs['thumbnailkey']));
- var syncgroupName = s.addrs[0];
- var thumbnailSyncgroupName = s.addrs[1];
- model.PresentationAdvertisement presentation =
- new model.PresentationAdvertisement(
- key, deck, syncgroupName, thumbnailSyncgroupName);
-
- _onFoundEmitter.add(presentation);
- }
-
- lost(String instanceId) {
- String presentationId = instanceIdToPresentationIdMap[instanceId];
- if (presentationId == null) {
- return;
- }
- log.info('Lost presentation $presentationId.');
- _onLostEmitter.add(presentationId);
- }
-}
-
-class ProxyResponseFuturePair<T1, T2> {
- final T1 proxy;
- final Future<T2> responseFuture;
- ProxyResponseFuturePair(this.proxy, this.responseFuture);
+class PresentationScanner {
+ Stream<model.PresentationAdvertisement> onFound;
+ Stream<String> onLost;
+ PresentationScanner._internal(this.onFound, this.onLost);
}
diff --git a/dart/lib/stores/syncbase/store.dart b/dart/lib/stores/syncbase/store.dart
index 6d08108..567bac5 100644
--- a/dart/lib/stores/syncbase/store.dart
+++ b/dart/lib/stores/syncbase/store.dart
@@ -76,7 +76,9 @@
void _triggerStateChange() => _stateChangeEmitter.add(_state);
Future _startScanningForPresentations() async {
- discovery.onFound.listen((model.PresentationAdvertisement newP) {
+ discovery.PresentationScanner scanner = await discovery.scan();
+
+ scanner.onFound.listen((model.PresentationAdvertisement newP) {
_state._presentationsAdvertisements[newP.key] = newP;
_triggerStateChange();
@@ -87,7 +89,7 @@
sb.joinSyncgroup(sgName);
});
- discovery.onLost.listen((String presentationId) {
+ scanner.onLost.listen((String presentationId) {
_state._presentationsAdvertisements.remove(presentationId);
_state._decks.values.forEach((_DeckState deck) {
if (deck.presentation != null &&
@@ -97,8 +99,6 @@
});
_triggerStateChange();
});
-
- discovery.startScan();
}
Future _getInitialValuesAndStartWatching(String table) async {
diff --git a/dart/pubspec.lock b/dart/pubspec.lock
index 1674050..347d63f 100644
--- a/dart/pubspec.lock
+++ b/dart/pubspec.lock
@@ -258,7 +258,7 @@
v23discovery:
description: v23discovery
source: hosted
- version: "0.0.9"
+ version: "0.0.10"
vector_math:
description: vector_math
source: hosted
diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml
index f0cccde..9e9b207 100644
--- a/dart/pubspec.yaml
+++ b/dart/pubspec.yaml
@@ -6,7 +6,7 @@
logging: ">=0.11.2 <0.12.0"
mojo_services: ">=0.4.5 <0.5.0"
syncbase: ">=0.0.27 <0.1.0"
- v23discovery: ">=0.0.9 < 0.1.0"
+ v23discovery: ">=0.0.10 < 0.1.0"
uuid: ">=0.5.0 <0.6.0"
dev_dependencies:
flutter_tools: