luma_third_party: ViewHierarchy Service

While a user interacts with a device through STF,
the ViewHierarchy service issues a request for the
user's current Android View Hierarchy XML per each
GestureStart from the UI. View Hierarchy data is
sent over adb, forwarded to STF via port listeners,
and persisted.

Change-Id: Ia5b2f9de39d121c7661fb3154c5cc04fc90d4f46
diff --git a/crowdstf/lib/db/api.js b/crowdstf/lib/db/api.js
index 7531894..8bdcf8f 100644
--- a/crowdstf/lib/db/api.js
+++ b/crowdstf/lib/db/api.js
@@ -169,7 +169,7 @@
  */
 dbapi.saveDeviceEvent = function(deviceSerial, sessionId, eventName, imgId,
     timestamp, seq, contact, x, y, pressure, userEmail, userGroup, userIP,
-    userLastLogin, userName) {
+    userLastLogin, userName, viewHierarchy) {
   var deviceEventDTO = {
     serial: deviceSerial,
     sessionId: sessionId,
@@ -184,7 +184,8 @@
     userGroup: userGroup,
     userIP: userIP,
     userLastLogin: userLastLogin,
-    userName: userName
+    userName: userName,
+    viewHierarchy: viewHierarchy ? viewHierarchy : ''
   };
 
   return db.run(r.table('deviceEvents').insert(deviceEventDTO, {
diff --git a/crowdstf/lib/units/device/index.js b/crowdstf/lib/units/device/index.js
index 08be946..233e742 100644
--- a/crowdstf/lib/units/device/index.js
+++ b/crowdstf/lib/units/device/index.js
@@ -26,6 +26,7 @@
         .dependency(require('./plugins/store'))
         .dependency(require('./plugins/clipboard'))
         .dependency(require('./plugins/logcat'))
+        .dependency(require('./plugins/viewbridge'))
         .dependency(require('./plugins/mute'))
         .dependency(require('./plugins/shell'))
         .dependency(require('./plugins/touch'))
diff --git a/crowdstf/lib/units/device/plugins/viewbridge.js b/crowdstf/lib/units/device/plugins/viewbridge.js
new file mode 100644
index 0000000..ba1f282
--- /dev/null
+++ b/crowdstf/lib/units/device/plugins/viewbridge.js
@@ -0,0 +1,152 @@
+var syrup = require('stf-syrup');
+var Promise = require('bluebird');
+var net = require('net');
+var logger = require('../../../util/logger');
+var wire = require('../../../wire');
+var wireutil = require('../../../wire/util');
+var lifecycle = require('../../../util/lifecycle');
+var viewBridgePorts = require('../viewbridgeports');
+
+// Localhost binding complies with STF spec, placing
+// the device threads on the same host as adb.
+const ADB_VIEW_SERVER_HOST = '127.0.0.1';
+
+module.exports = syrup.serial()
+    .dependency(require('../support/adb'))
+    .dependency(require('../support/router'))
+    .dependency(require('../support/push'))
+    .dependency(require('./group'))
+    .define(function(options, adb, router, push, group) {
+      // This odd plugin/object setup follows the OpenSTF open source
+      // plugin pattern.
+      var log = logger.createLogger('device:plugins:viewbridge');
+      var plugin = Object.create(null);
+      var activeViewBridge = null;
+
+      var openViewBridge = function() {
+        return new Promise(function(resolve, reject) {
+          log.info('adb view bridge opening stream.');
+
+          var activeViewBridge = new net.Socket();
+          var port = viewBridgePorts[options.serial] || viewBridgePorts.default;
+          activeViewBridge.connect(port, ADB_VIEW_SERVER_HOST, function() {
+            resolve(activeViewBridge);
+          });
+
+          activeViewBridge.on('error', function(err) {
+            reject();
+            log.error('Unable to access adb view bridge');
+            throw err;
+          });
+        });
+      };
+
+      // The plugin start implementation follows the OpenSTF
+      // plugin pattern of deferred stop-start
+      plugin.start = function() {
+        return group.get()
+            .then(function(group) {
+              return plugin.stop()
+                  .then(function() {
+                    log.info('Starting view bridge.');
+                    return openViewBridge(options.serial);
+                  })
+                  .then(function(logcat) {
+                    activeViewBridge = logcat;
+
+                    function entryListener(entry) {
+                      try {
+                        push.send([
+                          group.group,
+                          wireutil.envelope(
+                              new wire.DeviceViewBridgeEntryMessage(
+                              options.serial,
+                              new Date().getTime(),
+                              entry.toString()
+                              )
+                          )
+                        ]);
+                      } catch (err) {
+                        log.warn('View bridge socket emit failure.');
+                      }
+                    }
+
+                    activeViewBridge.on('data', entryListener);
+
+                    return plugin.reset();
+                  });
+            });
+      };
+
+      // The view bridge writes a 'd' character over tcp
+      // to request a dump from the on-device view hierarchy dump service.
+      plugin.getSeq = Promise.method(function(seq) {
+        if (plugin.isRunning()) {
+          activeViewBridge.write('d ' + seq + '\n');
+        }
+      });
+
+      plugin.stop = Promise.method(function() {
+        if (plugin.isRunning()) {
+          log.info('Stopping view bridge.');
+          activeViewBridge.destroy();
+          activeViewBridge = null;
+        }
+      });
+
+      plugin.reset = Promise.method(function(filters) {
+        filters = null;
+      });
+
+      plugin.isRunning = function() {
+        return !!activeViewBridge && activeViewBridge.destroy;
+      };
+
+      lifecycle.observe(plugin.stop);
+      group.on('leave', plugin.stop);
+
+      router.on(wire.ViewBridgeStartMessage, function(channel, message) {
+        var reply = wireutil.reply(options.serial);
+        plugin.start(message.filters)
+            .then(function() {
+              push.send([
+                channel,
+                reply.okay('success')
+              ]);
+            }).catch(function(err) {
+              log.warn('Unable to open view bridge.', err.stack);
+              push.send([
+                channel,
+                reply.fail('fail')
+              ]);
+            });
+          })
+          .on(wire.ViewBridgeGetMessage, function(channel, message) {
+            var reply = wireutil.reply(options.serial);
+
+            plugin.getSeq(message.seq);
+            push.send([
+              channel,
+              reply.okay('success')
+            ]);
+          })
+          .on(wire.ViewBridgeStopMessage, function(channel) {
+            var reply = wireutil.reply(options.serial);
+            plugin.stop()
+                .then(function() {
+                  push.send([
+                    channel,
+                    reply.okay('success')
+                  ]);
+                })
+                .catch(function(err) {
+                  log.warn('Failed to stop view bridge', err.stack);
+                  push.send([
+                    channel,
+                    reply.fail('fail')
+                  ]);
+                });
+          });
+
+      return plugin;
+    });
diff --git a/crowdstf/lib/units/device/resources/service.js b/crowdstf/lib/units/device/resources/service.js
index bc3f737..cb41782 100644
--- a/crowdstf/lib/units/device/resources/service.js
+++ b/crowdstf/lib/units/device/resources/service.js
@@ -3,6 +3,9 @@
 var syrup = require('stf-syrup')
 var ProtoBuf = require('protobufjs')
 var semver = require('semver')
+var adbkit = require('adbkit');
+var request = require('request');
+var viewBridgePorts = require('../viewbridgeports');
 
 var pathutil = require('../../../util/pathutil')
 var streamutil = require('../../../util/streamutil')
@@ -29,6 +32,35 @@
       }
     }
 
+    function initViewServer() {
+      log.info('Running CMD: adb shell dumpsys activity start-view-server');
+      adb.shell(options.serial, 'dumpsys activity start-view-server')
+          .then(adbkit.util.readAll)
+          .then(function(output) {
+            log.info('Success starting view server, messages: [%s]',
+                output.toString().trim());
+          });
+    }
+
+    function forwardTcpXml() {
+      log.info('Creating adb tcp view bridge port forward.');
+      var localPort = 'tcp:' + viewBridgePorts[options.serial] ||
+          viewBridgePorts.default;
+      var remotePort = 'tcp:' + viewBridgePorts.ANDROID_REMOTE;
+
+      // Remove any existing forwards per serial, then add new forward.
+      adb.shell(options.serial, 'forward --remove ' + localPort)
+          .then(function() {
+            log.info('Success removing ports for device %s', options.serial);
+            adb.forward(options.serial, localPort, remotePort)
+                .then(function(output) {
+                  log.info('Forwarded tcp xml port %s, messages: [%s]',
+                      localPort,
+                      output.toString().trim());
+                });
+          });
+    }
+
     function getPath() {
       return adb.shell(options.serial, ['pm', 'path', resource.pkg])
         .timeout(10000)
@@ -96,6 +128,8 @@
         })
     }
 
+    initViewServer();
+    forwardTcpXml();
     return install()
       .then(function(path) {
         log.info('STFService up to date')
diff --git a/crowdstf/lib/units/device/viewbridgeports.js b/crowdstf/lib/units/device/viewbridgeports.js
new file mode 100644
index 0000000..4dc15c7
--- /dev/null
+++ b/crowdstf/lib/units/device/viewbridgeports.js
@@ -0,0 +1,5 @@
+//TODO(hibschman@): create device manager and migrate away from static serials.
+module.exports = {
+  'default': 1698,
+  'ANDROID_REMOTE': 1699
+};
diff --git a/crowdstf/lib/units/processor/index.js b/crowdstf/lib/units/processor/index.js
index 7b022d9..cb442af 100644
--- a/crowdstf/lib/units/processor/index.js
+++ b/crowdstf/lib/units/processor/index.js
@@ -189,6 +189,9 @@
     .on(wire.DeviceLogcatEntryMessage, function(channel, message, data) {
       appDealer.send([channel, data])
     })
+    .on(wire.DeviceViewBridgeEntryMessage, function(channel, message, data) {
+      appDealer.send([channel, data]);
+    })
     .on(wire.AirplaneModeEvent, function(channel, message, data) {
       dbapi.setDeviceAirplaneMode(message.serial, message.enabled)
       appDealer.send([channel, data])
diff --git a/crowdstf/lib/units/websocket/deviceeventstore.js b/crowdstf/lib/units/websocket/deviceeventstore.js
index 5d597cd..bdddc50 100644
--- a/crowdstf/lib/units/websocket/deviceeventstore.js
+++ b/crowdstf/lib/units/websocket/deviceeventstore.js
@@ -1,9 +1,9 @@
-var dbapi = require('../../db/api')
+var dbapi = require('../../db/api');
 
 function DeviceEventStore() {
 }
 
-DeviceEventStore.prototype.storeEvent = function (eventName, eventData) {
+DeviceEventStore.prototype.storeEvent = function(eventName, eventData) {
   var imgId = eventData.imgId;
   var serial = eventData.serial;
   var timestamp = eventData.timestamp;
@@ -12,11 +12,13 @@
   var userGroup = eventData.userGroup;
   var userIP = eventData.userIP;
   var userLastLogin = eventData.userLastLogin;
-  var userName = eventData.userName
+  var userName = eventData.userName;
+  var viewHierarchy = eventData.viewHierarchy;
 
   dbapi.saveDeviceEvent(serial, sessionId, eventName, imgId, timestamp,
-      eventData.seq, eventData.contact, eventData.x, eventData.y, eventData.pressure,
-      userEmail, userGroup, userIP, userLastLogin, userName)
-}
+      eventData.seq, eventData.contact, eventData.x, eventData.y,
+      eventData.pressure, userEmail, userGroup, userIP, userLastLogin,
+      userName, viewHierarchy);
+};
 
-module.exports = DeviceEventStore
+module.exports = DeviceEventStore;
diff --git a/crowdstf/lib/units/websocket/index.js b/crowdstf/lib/units/websocket/index.js
index 4b3b465..df8af44 100644
--- a/crowdstf/lib/units/websocket/index.js
+++ b/crowdstf/lib/units/websocket/index.js
@@ -26,6 +26,11 @@
 var deviceEventStore = new DeviceEventStore();
 var layoutCaptureService = require('./layoutcaptureservice')
 
+const START_LOGCAT_DELIM = 'RicoBegin';
+const START_DELIM_CHAR = START_LOGCAT_DELIM.substr(0, 1);
+const END_LOGCAT_DELIM = 'RicoEnd';
+const START_DELIM_LEN = START_LOGCAT_DELIM.length;
+const VIEW_JSON_END_DELIMITER = 'RICO_JSON_END';
 
 module.exports = function(options) {
   var log = logger.createLogger('websocket')
@@ -35,6 +40,8 @@
       , transports: ['websocket']
       })
   var channelRouter = new events.EventEmitter()
+  var viewHierarchyJSON = '';
+  var viewResHandler = null;
 
   // Output
   var push = zmqutil.socket('push')
@@ -226,7 +233,28 @@
         socket.emit('tx.done', channel.toString(), message)
       })
       .on(wire.DeviceLogcatEntryMessage, function(channel, message) {
-        socket.emit('logcat.entry', message)
+        var messageStr = message.message;
+        var serial = message.serial;
+        var date = message.date;
+
+        // Break the logcat messages out of their delimiters and save.
+        if (messageStr.indexOf(START_LOGCAT_DELIM) > -1) {
+          messageStr.split(START_LOGCAT_DELIM).forEach(function(item) {
+            if (item.indexOf(END_LOGCAT_DELIM) > -1) {
+              var logcatMessage = item.split(END_LOGCAT_DELIM)[0];
+
+              log.info('Saving logcat message: %s %s "%s"', serial, date,
+                  logcatMessage);
+              dbapi.saveLogcat(message.serial, date, logcatMessage);
+            }
+          });
+        }
+      })
+      .on(wire.DeviceViewBridgeEntryMessage, function(channel, message) {
+        viewHierarchyJSON += message.message;
+        if (message.message.indexOf(VIEW_JSON_END_DELIMITER) > -1) {
+          viewResHandler(viewHierarchyJSON);
+        }
       })
       .on(wire.AirplaneModeEvent, function(channel, message) {
         socket.emit('device.change', {
@@ -519,20 +547,27 @@
 
         })
         .on('input.gestureStart', function(channel, data) {
-          layoutCaptureService.enqueue(wire.GestureStartMessage, function(xmlRes) {
-            console.log("Received XML:", xmlRes)
-
-            data.xml = xmlRes;
-            deviceEventStore.storeEvent('input.gestureStart', data);
-
-            push.send([
-              channel
-              , wireutil.envelope(new wire.GestureStartMessage(
-                data.seq
+          layoutCaptureService.enqueue(wire.GestureStartMessage, function() {
+            push.send([channel,
+              wireutil.envelope(new wire.GestureStartMessage(
+              data.seq
               ))
-            ])
-          });
+            ]);
+          }, function(callback) {
+            viewHierarchyJSON = '';
+            viewResHandler = function(viewHierarchy) {
+              data.viewHierarchy = viewHierarchy;
+              deviceEventStore.storeEvent('input.gestureStart', data);
+              callback();
+            };
 
+            // Send a request to the TCP view bridge.
+            push.send([channel,
+              wireutil.envelope(new wire.ViewBridgeGetMessage(
+                  data.imgId.split('_')[1]
+              ))
+            ]);
+          });
         })
         .on('input.gestureStop', function(channel, data) {
           layoutCaptureService.enqueue(wire.GestureStopMessage, function() {
diff --git a/crowdstf/lib/units/websocket/layoutcaptureservice.js b/crowdstf/lib/units/websocket/layoutcaptureservice.js
index 8ad7696..c34a694 100644
--- a/crowdstf/lib/units/websocket/layoutcaptureservice.js
+++ b/crowdstf/lib/units/websocket/layoutcaptureservice.js
@@ -1,16 +1,19 @@
-var wire = require('../../wire')
+var wire = require('../../wire');
+var net = require('net');
 
 function LayoutCaptureService() {
   this.actionQueue = [];
 }
 
-LayoutCaptureService.prototype.enqueue = function(wireEvent, actionFn) {
+LayoutCaptureService.prototype.enqueue = function(wireEvent, actionFn,
+                                                  fetchView) {
   this.actionQueue.push({
     wireEvent: wireEvent,
-    actionFn: actionFn
+    actionFn: actionFn,
+    fetchView: fetchView
   });
   this.checkStartCaptures(wireEvent);
-}
+};
 
 LayoutCaptureService.prototype.dequeue = function() {
   if (this.actionQueue.length > 0) {
@@ -18,52 +21,37 @@
   } else {
     return null;
   }
-}
+};
 
 LayoutCaptureService.prototype.checkStartCaptures = function() {
   if (this.actionQueue.length > 0 && !this.processing) {
     this.processing = true;
-    this.processStr = "";
+    layoutCaptureService.processStr = '';
     var nextItem = function() {
       var eventActionObj = layoutCaptureService.dequeue();
       if (eventActionObj) {
-        layoutCaptureService.processStr += " (" +
-          eventActionObj.wireEvent.$code + ") ";
+        layoutCaptureService.processStr += ' (' +
+            eventActionObj.wireEvent.$code + ') ';
         if (eventActionObj.wireEvent === wire.GestureStartMessage) {
-          console.log("Queueing gesture-start event")
-
-          layoutCaptureService.fetchLayout(function(err, res) {
+          eventActionObj.fetchView(function(err, layoutJSON) {
             if (err) {
               console.error(err);
             } else {
-              eventActionObj.actionFn(res)
-              nextItem()
+              eventActionObj.actionFn(layoutJSON);
+              nextItem();
             }
           });
         } else {
-          if (eventActionObj.wireEvent === wire.GestureStopMessage) {
-            console.log("Queueing gesture-stop event")
-          }
-
-          eventActionObj.actionFn()
-          nextItem()
+          eventActionObj.actionFn();
+          nextItem();
         }
       } else {
         layoutCaptureService.processing = false;
       }
-    }
+    };
 
     nextItem();
   }
-}
-
-LayoutCaptureService.prototype.fetchLayout = function(callback) {
-  //TODO(hibschman@): swap out this delay simulation stub with Device XML Fetch
-  var rand = Math.floor(Math.random() * (300 - 100 + 1) + 100);
-  console.log("Delay", rand, "millis")
-  setTimeout(function() {
-    callback(null, "<xml layout='mock'></xml>");
-  }, rand)
 };
 
 var layoutCaptureService = new LayoutCaptureService();
diff --git a/crowdstf/lib/wire/wire.proto b/crowdstf/lib/wire/wire.proto
index 868e507..6f5d0c3 100644
--- a/crowdstf/lib/wire/wire.proto
+++ b/crowdstf/lib/wire/wire.proto
@@ -6,6 +6,7 @@
   DeviceAbsentMessage        = 1;
   DeviceIdentityMessage      = 2;
   DeviceLogcatEntryMessage   = 3;
+  DeviceViewBridgeEntryMessage = 92;
   DeviceLogMessage           = 4;
   DeviceReadyMessage         = 5;
   DevicePresentMessage       = 6;
@@ -48,7 +49,10 @@
   ForwardCreateMessage       = 37;
   ForwardRemoveMessage       = 38;
   LogcatStartMessage         = 39;
+  ViewBridgeStartMessage     = 93;
   LogcatStopMessage          = 40;
+  ViewBridgeStopMessage      = 94;
+  ViewBridgeGetMessage       = 95;
   BrowserOpenMessage         = 41;
   BrowserClearMessage        = 42;
   AirplaneModeEvent          = 43;
@@ -365,10 +369,12 @@
   required string serial = 1;
   required double date = 2;
   required uint32 pid = 3;
-  required uint32 tid = 4;
-  required uint32 priority = 5;
-  required string tag = 6;
-  required string message = 7;
+}
+
+message DeviceViewBridgeEntryMessage {
+  required string serial = 1;
+  required double date = 2;
+  required string message = 3;
 }
 
 message LogcatFilter {