Client side of casting

Change-Id: I0ef7d18aff0c592552ec2317476e2c2c485a4ee3
diff --git a/examples/distro/app/src/flutter/lib/main.dart b/examples/distro/app/src/flutter/lib/main.dart
index c7fcffa..0fc0fd8 100644
--- a/examples/distro/app/src/flutter/lib/main.dart
+++ b/examples/distro/app/src/flutter/lib/main.dart
@@ -41,31 +41,11 @@
   void set data(final InputValue value) {
     _data = value;
     if (castTargetName != null) {
-      HostMessages.sendToHost('updateCast', getCastData());
+      HostMessages.sendToHost('updateCast', getCastData())
+        .catchError(_handleCastError);
     }
   }
 
-  String get castTargetName => _castTargetName;
-
-  void set castTargetName(final String value) {
-    if (_castTargetName != value) {
-      if (_castTargetName != null) {
-        HostMessages.sendToHost('terminateCast', castTargetName);
-      }
-
-      _castTargetName = value;
-
-      if (_castTargetName != null) {
-        HostMessages.sendToHost('initiateCast', getCastData());
-      }
-    }
-  }
-
-  _BakuDistroState() {
-    HostMessages.addMessageHandler('deviceOnline', _onDeviceOnline);
-    HostMessages.addMessageHandler('deviceOffline', _onDeviceOffline);
-  }
-
   String getCastData() {
     return JSON.encode({
       'name': castTargetName,
@@ -73,6 +53,35 @@
     });
   }
 
+  String get castTargetName => _castTargetName;
+
+  void set castTargetName(final String value) {
+    if (_castTargetName != value) {
+      initiateCast(value);
+    }
+  }
+
+  void terminateCast() {
+    if (_castTargetName != null) {
+      HostMessages.sendToHost('terminateCast', _castTargetName);
+      _castTargetName = null;
+    }
+  }
+
+  void initiateCast(final String target) {
+    terminateCast();
+    if (target != null) {
+      _castTargetName = target;
+      HostMessages.sendToHost('initiateCast', getCastData())
+        .catchError(_handleCastError);
+    }
+  }
+
+  _BakuDistroState() {
+    HostMessages.addMessageHandler('deviceOnline', _onDeviceOnline);
+    HostMessages.addMessageHandler('deviceOffline', _onDeviceOffline);
+  }
+
   @override
   Widget build(final BuildContext context) {
     final List<_Device> sortedDevices = devices.keys.map((name) =>
@@ -108,7 +117,7 @@
     } else {
       layout.add(new RaisedButton(
         child: new Text('Recall'),
-        onPressed: () => castTargetName = null
+        onPressed: terminateCast
       ));
     }
 
@@ -131,9 +140,15 @@
     setState(() {
       devices.remove(name);
       if (castTargetName == name) {
-        castTargetName = null;
+        terminateCast();
       }
     });
     return null;
   }
+
+  void _handleCastError(final Object e) {
+    Scaffold.of(context).showSnackBar(
+      new SnackBar(content: new Text(e.toString())));
+    terminateCast();
+  }
 }
diff --git a/examples/distro/app/src/main/java/io/baku/examples/distro/DistroActivity.java b/examples/distro/app/src/main/java/io/baku/examples/distro/DistroActivity.java
index cace0c5..836f960 100644
--- a/examples/distro/app/src/main/java/io/baku/examples/distro/DistroActivity.java
+++ b/examples/distro/app/src/main/java/io/baku/examples/distro/DistroActivity.java
@@ -11,13 +11,9 @@
 import android.content.Intent;
 import android.content.pm.ApplicationInfo;
 import android.os.Bundle;
-import android.support.annotation.NonNull;
 import android.support.v4.app.FragmentActivity;
 import android.util.Log;
-import android.widget.Toast;
 
-import com.google.android.gms.common.ConnectionResult;
-import com.google.android.gms.common.api.GoogleApiClient;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -40,15 +36,17 @@
 import io.v.android.VAndroidContexts;
 import io.v.android.security.BlessingsManager;
 import io.v.v23.security.Blessings;
+import io.v.v23.vdl.ClientSendStream;
 import io.v.v23.verror.VException;
 import io.v.v23.vom.VomUtil;
 import java8.util.Maps;
+import lombok.RequiredArgsConstructor;
 import rx.Subscription;
 
 /**
  * Activity representing the example 'app', a.k.a. the initiator/originator/master.
  */
-public class DistroActivity extends FragmentActivity implements GoogleApiClient.OnConnectionFailedListener {
+public class DistroActivity extends FragmentActivity {
     private static final String TAG = DistroActivity.class.getSimpleName();
     private static final Duration PING_TIMEOUT = Duration.standardSeconds(2);
     private static final long POLL_INTERVAL = 750;
@@ -90,11 +88,6 @@
                 });
     }
 
-    @Override
-    public void onConnectionFailed(final @NonNull ConnectionResult connectionResult) {
-        Toast.makeText(this, connectionResult.getErrorMessage(), Toast.LENGTH_LONG).show();
-    }
-
     private void onBlessingsAvailable(final Blessings blessings) {
         final Intent castIntent = new Intent(DistroActivity.this,
                 DistroAndroidService.class);
@@ -107,6 +100,10 @@
         startService(castIntent);
 
         subscription = startScanning();
+
+        flutterView.addOnMessageListener("initiateCast", this::initiateCast);
+        flutterView.addOnMessageListener("updateCast", this::updateCast);
+        flutterView.addOnMessageListener("terminateCast", this::terminateCast);
     }
 
     @Override
@@ -159,10 +156,28 @@
 
     private class ConnectionMonitor implements FutureCallback<String> {
         final String name;
-        final DistroClient client;
+
+        private final DistroClient client;
+
+        private ClientSendStream<State, Void> castStream;
 
         private ListenableFuture<String> poll;
 
+        private final FutureCallback<Object> failureHandler = new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(final Object result) {
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                if (isActive()) {
+                    flutterView.sendToFlutter("deviceOffline", name);
+                    terminateCast();
+                    clients.remove(name);
+                }
+            }
+        };
+
         public ConnectionMonitor(final String name) {
             this.name = name;
             client = DistroClientFactory.getDistroClient(name);
@@ -193,10 +208,22 @@
 
         @Override
         public void onFailure(final Throwable t) {
-            if (isActive()) {
-                flutterView.sendToFlutter("deviceOffline", name);
+            failureHandler.onFailure(t);
+        }
 
-                clients.remove(name);
+        public ConnectionMonitor initiateCast() {
+            castStream = client.cast(context.getVContext());
+            return this;
+        }
+
+        public void updateCast(final String data) {
+            Futures.addCallback(castStream.send(new State(data)), failureHandler);
+        }
+
+        public void terminateCast() {
+            if (castStream != null) {
+                Futures.addCallback(castStream.finish(), failureHandler);
+                castStream = null;
             }
         }
     }
@@ -206,4 +233,45 @@
                 .subscribe(name -> Maps.computeIfAbsent(clients, name, ConnectionMonitor::new),
                         t -> context.getErrorReporter().onError(R.string.err_scan, t));
     }
+
+    @RequiredArgsConstructor
+    private static class CastMessage {
+        public final String name, data;
+
+        public static CastMessage fromJson(final String json) {
+            final JSONObject message;
+            try {
+                message = new JSONObject(json);
+                return new CastMessage(
+                        message.getString("name"),
+                        message.getString("data"));
+            } catch (final JSONException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+    }
+
+    private String initiateCast(final String json) {
+        final CastMessage message = CastMessage.fromJson(json);
+        clients.get(message.name)
+                .initiateCast()
+                .updateCast(message.data);
+        return null;
+    }
+
+    private String updateCast(final String json) {
+        final CastMessage message = CastMessage.fromJson(json);
+        clients.get(message.name)
+                .updateCast(message.data);
+        return null;
+    }
+
+    private String terminateCast(final String name) {
+        final ConnectionMonitor conn = clients.get(name);
+        if (conn != null) {
+            conn.terminateCast();
+        }
+
+        return null;
+    }
 }