blob: 2d5f2ed7d28da1bf1dbdf6c08eadbed4a98a1ed8 [file] [log] [blame]
'''
Copyright (C) 2012-2015 Diego Torres Milano
Created on Dec 1, 2012
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@author: Diego Torres Milano
'''
import threading
__version__ = '11.5.9'
import sys
import warnings
if sys.executable:
if 'monkeyrunner' in sys.executable:
warnings.warn(
'''
You should use a 'python' interpreter, not 'monkeyrunner' for this module
''', RuntimeWarning)
import string
import datetime
import struct
import cStringIO as StringIO
import socket
import time
import re
import signal
import os
import types
import platform
from com.dtmilano.android.window import Window
from com.dtmilano.android.common import _nd, _nh, _ns, obtainPxPy, obtainVxVy, \
obtainVwVh, profileStart, profileEnd
from com.dtmilano.android.adb.androidkeymap import KEY_MAP
DEBUG = False
DEBUG_SHELL = DEBUG and False
DEBUG_TOUCH = DEBUG and False
DEBUG_LOG = DEBUG and False
DEBUG_WINDOWS = DEBUG and False
DEBUG_COORDS = DEBUG and False
DEBUG_IMAGE_ROTATION = DEBUG and False
PIL_AVAILABLE = False
PROFILE = False
try:
HOSTNAME = os.environ['ANDROID_ADB_SERVER_HOST']
except:
HOSTNAME = 'localhost'
try:
PORT = int(os.environ['ANDROID_ADB_SERVER_PORT'])
except KeyError:
PORT = 5037
OKAY = 'OKAY'
FAIL = 'FAIL'
UP = 0
DOWN = 1
DOWN_AND_UP = 2
TIMEOUT = 15
WIFI_SERVICE = 'wifi'
# some device properties
VERSION_SDK_PROPERTY = 'ro.build.version.sdk'
VERSION_RELEASE_PROPERTY = 'ro.build.version.release'
class Device:
@staticmethod
def factory(_str):
if DEBUG:
print >> sys.stderr, "Device.factory(", _str, ")"
print >> sys.stderr, " _str=", repr(_str)
print >> sys.stderr, " _str=", _str.replace(' ', '_')
values = _str.split(None, 2)
if DEBUG:
print >> sys.stderr, "values=", values
return Device(*values)
def __init__(self, serialno, status, qualifiers=None):
self.serialno = serialno
self.status = status
self.qualifiers = qualifiers
def __str__(self):
return "<<<" + self.serialno + ", " + self.status + ", %s>>>" % self.qualifiers
class WifiManager:
'''
Simulates Android WifiManager.
@see: http://developer.android.com/reference/android/net/wifi/WifiManager.html
'''
WIFI_STATE_DISABLING = 0
WIFI_STATE_DISABLED = 1
WIFI_STATE_ENABLING = 2
WIFI_STATE_ENABLED = 3
WIFI_STATE_UNKNOWN = 4
WIFI_IS_ENABLED_RE = re.compile('Wi-Fi is enabled')
WIFI_IS_DISABLED_RE = re.compile('Wi-Fi is disabled')
def __init__(self, device):
self.device = device
def getWifiState(self):
'''
Gets the Wi-Fi enabled state.
@return: One of WIFI_STATE_DISABLED, WIFI_STATE_DISABLING, WIFI_STATE_ENABLED, WIFI_STATE_ENABLING, WIFI_STATE_UNKNOWN
'''
result = self.device.shell('dumpsys wifi')
if result:
state = result.splitlines()[0]
if self.WIFI_IS_ENABLED_RE.match(state):
return self.WIFI_STATE_ENABLED
elif self.WIFI_IS_DISABLED_RE.match(state):
return self.WIFI_STATE_DISABLED
print >> sys.stderr, "UNKNOWN WIFI STATE:", state
return self.WIFI_STATE_UNKNOWN
class AdbClient:
def __init__(self, serialno=None, hostname=HOSTNAME, port=PORT, settransport=True, reconnect=True,
ignoreversioncheck=False, timeout=TIMEOUT):
self.Log = AdbClient.__Log(self)
self.serialno = serialno
self.hostname = hostname
self.port = port
self.timeout = timeout
self.reconnect = reconnect
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
self.checkVersion(ignoreversioncheck)
self.build = {}
''' Build properties '''
self.__displayInfo = None
''' Cached display info. Reset it to C{None} to force refetching display info '''
self.display = {}
''' The map containing the device's physical display properties: width, height and density '''
self.isTransportSet = False
if settransport and serialno != None:
self.__setTransport()
self.build[VERSION_SDK_PROPERTY] = int(self.__getProp(VERSION_SDK_PROPERTY))
self.initDisplayProperties()
@staticmethod
def alarmHandler(signum, frame):
if signum == signal.SIGALRM:
raise IOError("Socket timeout")
raise RuntimeError("Signal received: %d" % signum)
@staticmethod
def setAlarm(timeout):
osName = platform.system()
if osName.startswith('Windows'): # alarm is not implemented in Windows
return
if DEBUG:
print >> sys.stderr, "setAlarm(%d)" % timeout
if threading.current_thread().getName() == 'MainThread':
signal.signal(signal.SIGALRM, AdbClient.alarmHandler)
signal.alarm(timeout)
def setSerialno(self, serialno):
if self.isTransportSet:
raise ValueError("Transport is already set, serialno cannot be set once this is done.")
self.serialno = serialno
self.__setTransport()
self.build[VERSION_SDK_PROPERTY] = int(self.__getProp(VERSION_SDK_PROPERTY))
def setReconnect(self, val):
self.reconnect = val
@staticmethod
def connect(hostname, port, timeout=TIMEOUT):
if DEBUG:
print >> sys.stderr, "AdbClient.connect(%s, %s, %s)" % (hostname, port, timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# SO_LINGER: Idea proposed by kysersozelee (#173)
l_onoff = 1
l_linger = 0
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', l_onoff, l_linger))
s.settimeout(timeout)
try:
s.connect((hostname, port))
except socket.error, ex:
raise RuntimeError("ERROR: Connecting to %s:%d: %s.\nIs adb running on your computer?" % (s, port, ex))
return s
def close(self):
if DEBUG:
print >> sys.stderr, "Closing socket...", self.socket
if self.socket:
self.socket.close()
def __del__(self):
try:
self.close()
except:
pass
def __send(self, msg, checkok=True, reconnect=False):
if DEBUG:
print >> sys.stderr, "__send(%s, checkok=%s, reconnect=%s)" % (msg, checkok, reconnect)
if not re.search('^host:', msg):
if not self.isTransportSet:
self.__setTransport()
else:
self.checkConnected()
b = bytearray(msg, 'utf-8')
self.socket.send('%04X%s' % (len(b), b))
if checkok:
self.__checkOk()
if reconnect:
if DEBUG:
print >> sys.stderr, " __send: reconnecting"
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
self.__setTransport()
def __receive(self, nob=None):
if DEBUG:
print >> sys.stderr, "__receive()"
self.checkConnected()
if nob is None:
nob = int(self.socket.recv(4), 16)
if DEBUG:
print >> sys.stderr, " __receive: receiving", nob, "bytes"
recv = bytearray()
nr = 0
while nr < nob:
chunk = self.socket.recv(min((nob - nr), 4096))
l = len(chunk)
if DEBUG:
print >> sys.stderr, "l=", l, "nr=", nr
recv.extend(chunk)
nr += l
if DEBUG:
print >> sys.stderr, " __receive: returning len=", len(recv)
return str(recv)
def __checkOk(self):
if DEBUG:
print >> sys.stderr, "__checkOk()"
self.checkConnected()
self.setAlarm(TIMEOUT)
recv = self.socket.recv(4)
if DEBUG:
print >> sys.stderr, " __checkOk: recv=", repr(recv)
try:
if recv != OKAY:
error = self.socket.recv(1024)
if error.startswith('0049'):
raise RuntimeError(
"ERROR: This computer is unauthorized. Please check the confirmation dialog on your device.")
else:
raise RuntimeError("ERROR: %s %s" % (repr(recv), error))
finally:
self.setAlarm(0)
if DEBUG:
print >> sys.stderr, " __checkOk: returning True"
return True
def checkConnected(self):
if DEBUG:
print >> sys.stderr, "checkConnected()"
if not self.socket:
raise RuntimeError("ERROR: Not connected")
if DEBUG:
print >> sys.stderr, " checkConnected: returning True"
return True
def checkVersion(self, ignoreversioncheck=False, reconnect=True):
if DEBUG:
print >> sys.stderr, "checkVersion(reconnect=%s) ignoreversioncheck=%s" % (reconnect, ignoreversioncheck)
self.__send('host:version', reconnect=False)
# HACK: MSG_WAITALL not available on windows
# version = self.socket.recv(8, socket.MSG_WAITALL)
version = self.__readExactly(self.socket, 8)
VALID_ADB_VERSIONS = ["00040024", "00040023", "00040020", "0004001f"]
if not (version in VALID_ADB_VERSIONS) and not ignoreversioncheck:
raise RuntimeError(
"ERROR: Incorrect ADB server version %s (expecting one of %s)" % (version, VALID_ADB_VERSIONS))
if reconnect:
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
def __setTransport(self):
if DEBUG:
print >> sys.stderr, "__setTransport()"
if not self.serialno:
raise ValueError("serialno not set, empty or None")
self.checkConnected()
serialnoRE = re.compile(self.serialno)
found = False
devices = self.getDevices()
if len(devices) == 0:
raise RuntimeError("ERROR: There are no connected devices")
for device in devices:
if serialnoRE.match(device.serialno):
found = True
break
if not found:
raise RuntimeError("ERROR: couldn't find device that matches '%s' in %s" % (self.serialno, devices))
self.serialno = device.serialno
msg = 'host:transport:%s' % self.serialno
if DEBUG:
print >> sys.stderr, " __setTransport: msg=", msg
self.__send(msg, reconnect=False)
self.isTransportSet = True
def __checkTransport(self):
if not self.isTransportSet:
raise RuntimeError("ERROR: Transport is not set")
def __readExactly(self, sock, size):
if DEBUG:
print >> sys.stderr, "__readExactly(socket=%s, size=%d)" % (socket, size)
_buffer = ''
while len(_buffer) < size:
data = sock.recv(size - len(_buffer))
if not data:
break
_buffer += data
return _buffer
def getDevices(self):
if DEBUG:
print >> sys.stderr, "getDevices()"
self.__send('host:devices-l', checkok=False)
try:
self.__checkOk()
except RuntimeError, ex:
print >> sys.stderr, "**ERROR:", ex
return None
devices = []
for line in self.__receive().splitlines():
devices.append(Device.factory(line))
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
return devices
def shell(self, cmd=None):
if DEBUG_SHELL:
print >> sys.stderr, "shell(cmd=%s)" % cmd
self.__checkTransport()
if cmd:
self.__send('shell:%s' % cmd, checkok=True, reconnect=False)
out = ''
while True:
_str = None
try:
_str = self.socket.recv(4096)
except Exception, ex:
print >> sys.stderr, "ERROR:", ex
if not _str:
break
out += _str
if self.reconnect:
if DEBUG:
print >> sys.stderr, "Reconnecting..."
self.close()
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
self.__setTransport()
return out
else:
self.__send('shell:')
# sin = self.socket.makefile("rw")
# sout = self.socket.makefile("r")
# return (sin, sin)
sout = adbClient.socket.makefile("r")
return sout
def getRestrictedScreen(self):
''' Gets C{mRestrictedScreen} values from dumpsys. This is a method to obtain display dimensions '''
rsRE = re.compile('\s*mRestrictedScreen=\((?P<x>\d+),(?P<y>\d+)\) (?P<w>\d+)x(?P<h>\d+)')
for line in self.shell('dumpsys window').splitlines():
m = rsRE.match(line)
if m:
return m.groups()
raise RuntimeError("Couldn't find mRestrictedScreen in 'dumpsys window'")
def getDisplayInfo(self):
self.__checkTransport()
displayInfo = self.getLogicalDisplayInfo()
if displayInfo:
return displayInfo
displayInfo = self.getPhysicalDisplayInfo()
if displayInfo:
return displayInfo
raise RuntimeError("Couldn't find display info in 'wm size', 'dumpsys display' or 'dumpsys window'")
def getLogicalDisplayInfo(self):
'''
Gets C{mDefaultViewport} and then C{deviceWidth} and C{deviceHeight} values from dumpsys.
This is a method to obtain display logical dimensions and density
'''
self.__checkTransport()
logicalDisplayRE = re.compile(
'.*DisplayViewport{valid=true, .*orientation=(?P<orientation>\d+), .*deviceWidth=(?P<width>\d+), deviceHeight=(?P<height>\d+).*')
for line in self.shell('dumpsys display').splitlines():
m = logicalDisplayRE.search(line, 0)
if m:
self.__displayInfo = {}
for prop in ['width', 'height', 'orientation']:
self.__displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
d = self.__getDisplayDensity(None, strip=True, invokeGetPhysicalDisplayIfNotFound=True)
if d:
self.__displayInfo[prop] = d
else:
# No available density information
self.__displayInfo[prop] = -1.0
return self.__displayInfo
return None
def getPhysicalDisplayInfo(self):
''' Gets C{mPhysicalDisplayInfo} values from dumpsys. This is a method to obtain display dimensions and density'''
self.__checkTransport()
phyDispRE = re.compile('Physical size: (?P<width>)x(?P<height>).*Physical density: (?P<density>)', re.MULTILINE)
m = phyDispRE.search(self.shell('wm size; wm density'))
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
displayInfo[prop] = float(m.group(prop))
return displayInfo
phyDispRE = re.compile(
'.*PhysicalDisplayInfo{(?P<width>\d+) x (?P<height>\d+), .*, density (?P<density>[\d.]+).*')
for line in self.shell('dumpsys display').splitlines():
m = phyDispRE.search(line, 0)
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
# In mPhysicalDisplayInfo density is already a factor, no need to calculate
displayInfo[prop] = float(m.group(prop))
return displayInfo
# This could also be mSystem or mOverscanScreen
phyDispRE = re.compile('\s*mUnrestrictedScreen=\((?P<x>\d+),(?P<y>\d+)\) (?P<width>\d+)x(?P<height>\d+)')
# This is known to work on older versions (i.e. API 10) where mrestrictedScreen is not available
dispWHRE = re.compile('\s*DisplayWidth=(?P<width>\d+) *DisplayHeight=(?P<height>\d+)')
for line in self.shell('dumpsys window').splitlines():
m = phyDispRE.search(line, 0)
if not m:
m = dispWHRE.search(line, 0)
if m:
displayInfo = {}
for prop in ['width', 'height']:
displayInfo[prop] = int(m.group(prop))
for prop in ['density']:
d = self.__getDisplayDensity(None, strip=True, invokeGetPhysicalDisplayIfNotFound=False)
if d:
displayInfo[prop] = d
else:
# No available density information
displayInfo[prop] = -1.0
return displayInfo
def __getProp(self, key, strip=True):
if DEBUG:
print >> sys.stderr, "__getProp(%s, %s)" % (key, strip)
prop = self.shell('getprop %s' % key)
if strip:
prop = prop.rstrip('\r\n')
if DEBUG:
print >> sys.stderr, " __getProp: returning '%s'" % prop
return prop
def __getDisplayWidth(self, key, strip=True):
if self.__displayInfo and 'width' in self.__displayInfo:
return self.__displayInfo['width']
return self.getDisplayInfo()['width']
def __getDisplayHeight(self, key, strip=True):
if self.__displayInfo and 'height' in self.__displayInfo:
return self.__displayInfo['height']
return self.getDisplayInfo()['height']
def __getDisplayOrientation(self, key, strip=True):
if self.__displayInfo and 'orientation' in self.__displayInfo:
return self.__displayInfo['orientation']
displayInfo = self.getDisplayInfo()
if 'orientation' in displayInfo:
return displayInfo['orientation']
# Fallback method to obtain the orientation
# See https://github.com/dtmilano/AndroidViewClient/issues/128
surfaceOrientationRE = re.compile('SurfaceOrientation:\s+(\d+)')
output = self.shell('dumpsys input')
m = surfaceOrientationRE.search(output)
if m:
return int(m.group(1))
# We couldn't obtain the orientation
return -1
def __getDisplayDensity(self, key, strip=True, invokeGetPhysicalDisplayIfNotFound=True):
if self.__displayInfo and 'density' in self.__displayInfo: # and self.__displayInfo['density'] != -1: # FIXME: need more testing
return self.__displayInfo['density']
BASE_DPI = 160.0
d = self.getProperty('ro.sf.lcd_density', strip)
if d:
return float(d) / BASE_DPI
d = self.getProperty('qemu.sf.lcd_density', strip)
if d:
return float(d) / BASE_DPI
if invokeGetPhysicalDisplayIfNotFound:
return self.getPhysicalDisplayInfo()['density']
return -1.0
def getSystemProperty(self, key, strip=True):
self.__checkTransport()
return self.getProperty(key, strip)
def getProperty(self, key, strip=True):
''' Gets the property value for key '''
self.__checkTransport()
import collections
MAP_PROPS = collections.OrderedDict([
(re.compile('display.width'), self.__getDisplayWidth),
(re.compile('display.height'), self.__getDisplayHeight),
(re.compile('display.density'), self.__getDisplayDensity),
(re.compile('display.orientation'), self.__getDisplayOrientation),
(re.compile('.*'), self.__getProp),
])
'''Maps properties key values (as regexps) to instance methods to obtain its values.'''
for kre in MAP_PROPS.keys():
if kre.match(key):
return MAP_PROPS[kre](key=key, strip=strip)
raise ValueError("key='%s' does not match any map entry")
def getSdkVersion(self):
'''
Gets the SDK version.
'''
self.__checkTransport()
return self.build[VERSION_SDK_PROPERTY]
def press(self, name, eventType=DOWN_AND_UP):
self.__checkTransport()
if isinstance(name, unicode):
name = name.decode('ascii', errors='replace')
cmd = 'input keyevent %s' % name
if DEBUG:
print >> sys.stderr, "press(%s)" % cmd
self.shell(cmd)
def longPress(self, name, duration=0.5, dev='/dev/input/event0'):
self.__checkTransport()
# WORKAROUND:
# Using 'input keyevent --longpress POWER' does not work correctly in
# KitKat (API 19), it sends a short instead of a long press.
# This uses the events instead, but it may vary from device to device.
# The events sent are device dependent and may not work on other devices.
# If this does not work on your device please do:
# $ adb shell getevent -l
# and post the output to https://github.com/dtmilano/AndroidViewClient/issues
# specifying the device and API level.
if name[0:4] == 'KEY_':
name = name[4:]
if name in KEY_MAP:
self.shell('sendevent %s 1 %d 1' % (dev, KEY_MAP[name]))
self.shell('sendevent %s 0 0 0' % dev)
time.sleep(duration)
self.shell('sendevent %s 1 %d 0' % (dev, KEY_MAP[name]))
self.shell('sendevent %s 0 0 0' % dev)
return
version = self.getSdkVersion()
if version >= 19:
cmd = 'input keyevent --longpress %s' % name
if DEBUG:
print >> sys.stderr, "longPress(%s)" % cmd
self.shell(cmd)
else:
raise RuntimeError("longpress: not supported for API < 19 (version=%d)" % version)
def startActivity(self, component=None, flags=None, uri=None):
self.__checkTransport()
cmd = 'am start'
if component:
cmd += ' -n %s' % component
if flags:
cmd += ' -f %s' % flags
if uri:
cmd += ' %s' % uri
if DEBUG:
print >> sys.stderr, "Starting activity: %s" % cmd
out = self.shell(cmd)
if re.search(r"(Error type)|(Error: )|(Cannot find 'App')", out, re.IGNORECASE | re.MULTILINE):
raise RuntimeError(out)
def takeSnapshot(self, reconnect=False):
'''
Takes a snapshot of the device and return it as a PIL Image.
'''
if PROFILE:
profileStart()
global PIL_AVAILABLE
if not PIL_AVAILABLE:
try:
global Image
from PIL import Image
PIL_AVAILABLE = True
except:
raise Exception("You have to install PIL to use takeSnapshot()")
USE_ADB_FRAMEBUFFER_METHOD = (self.getSdkVersion() < 14 or self.getSdkVersion() >= 23)
if USE_ADB_FRAMEBUFFER_METHOD:
self.__checkTransport()
self.__send('framebuffer:', checkok=True, reconnect=False)
# case 1: // version
# return 12; // bpp, size, width, height, 4*(length, offset)
received = self.__receive(1 * 4 + 12 * 4)
(version, bpp, size, width, height, roffset, rlen, boffset, blen, goffset, glen, aoffset,
alen) = struct.unpack(
'<' + 'L' * 13, received)
if DEBUG:
print >> sys.stderr, " takeSnapshot:", (
version, bpp, size, width, height, roffset, rlen, boffset, blen, goffset, glen, aoffset, alen)
offsets = {roffset: 'R', goffset: 'G', boffset: 'B'}
if bpp == 32:
if alen != 0:
offsets[aoffset] = 'A'
else:
warnings.warn('''framebuffer is specified as 32bpp but alpha length is 0''')
argMode = ''.join([offsets[o] for o in sorted(offsets)])
if DEBUG:
print >> sys.stderr, " takeSnapshot:", (
version, bpp, size, width, height, roffset, rlen, boffset, blen, goffset, blen, aoffset, alen,
argMode)
if argMode == 'BGRA':
argMode = 'RGBA'
if bpp == 16:
mode = 'RGB'
argMode += ';16'
else:
mode = argMode
self.__send('\0', checkok=False, reconnect=False)
if DEBUG:
print >> sys.stderr, " takeSnapshot: reading %d bytes" % (size)
received = self.__receive(size)
if reconnect:
self.socket = AdbClient.connect(self.hostname, self.port, self.timeout)
self.__setTransport()
if DEBUG:
print >> sys.stderr, " takeSnapshot: Image.frombuffer(%s, %s, %s, %s, %s, %s, %s)" % (
mode, (width, height), 'data', 'raw', argMode, 0, 1)
image = Image.frombuffer(mode, (width, height), received, 'raw', argMode, 0, 1)
else:
# ALTERNATIVE_METHOD: screencap
received = self.shell('/system/bin/screencap -p').replace("\r\n", "\n")
stream = StringIO.StringIO(received)
try:
image = Image.open(stream)
except IOError, ex:
print >> sys.stderr, ex
print repr(stream)
sys.exit(1)
# Just in case let's get the real image size
(w, h) = image.size
if w == self.display['height'] and h == self.display['width']:
# FIXME: We are not catching the 180 degrees rotation here
if 'orientation' in self.display:
r = (0, 90, 180, -90)[self.display['orientation']]
else:
r = 90
image = image.rotate(r, expand=1).resize((h, w))
if PROFILE:
profileEnd()
return image
def __transformPointByOrientation(self, (x, y), orientationOrig, orientationDest):
if orientationOrig != orientationDest:
if orientationDest == 1:
_x = x
x = self.display['width'] - y
y = _x
elif orientationDest == 3:
_x = x
x = y
y = self.display['height'] - _x
return (x, y)
def touch(self, x, y, orientation=-1, eventType=DOWN_AND_UP):
if DEBUG_TOUCH:
print >> sys.stderr, "touch(x=", x, ", y=", y, ", orientation=", orientation, ", eventType=", eventType, ")"
self.__checkTransport()
if orientation == -1:
orientation = self.display['orientation']
self.shell(
'input tap %d %d' % self.__transformPointByOrientation((x, y), orientation, self.display['orientation']))
def touchDip(self, x, y, orientation=-1, eventType=DOWN_AND_UP):
if DEBUG_TOUCH:
print >> sys.stderr, "touchDip(x=", x, ", y=", y, ", orientation=", orientation, ", eventType=", eventType, ")"
self.__checkTransport()
if orientation == -1:
orientation = self.display['orientation']
x = x * self.display['density']
y = y * self.display['density']
self.touch(x, y, orientation, eventType)
def longTouch(self, x, y, duration=2000, orientation=-1):
'''
Long touches at (x, y)
@param duration: duration in ms
@param orientation: the orientation (-1: undefined)
This workaround was suggested by U{HaMi<http://stackoverflow.com/users/2571957/hami>}
'''
self.__checkTransport()
self.drag((x, y), (x, y), duration, orientation)
def drag(self, (x0, y0), (x1, y1), duration, steps=1, orientation=-1):
'''
Sends drag event n PX (actually it's using C{input swipe} command.
@param (x0, y0): starting point in PX
@param (x1, y1): ending point in PX
@param duration: duration of the event in ms
@param steps: number of steps (currently ignored by @{input swipe})
@param orientation: the orientation (-1: undefined)
'''
self.__checkTransport()
if orientation == -1:
orientation = self.display['orientation']
(x0, y0) = self.__transformPointByOrientation((x0, y0), orientation, self.display['orientation'])
(x1, y1) = self.__transformPointByOrientation((x1, y1), orientation, self.display['orientation'])
version = self.getSdkVersion()
if version <= 15:
raise RuntimeError('drag: API <= 15 not supported (version=%d)' % version)
elif version <= 17:
self.shell('input swipe %d %d %d %d' % (x0, y0, x1, y1))
else:
self.shell('input touchscreen swipe %d %d %d %d %d' % (x0, y0, x1, y1, duration))
def dragDip(self, (x0, y0), (x1, y1), duration, steps=1, orientation=-1):
'''
Sends drag event in DIP (actually it's using C{input swipe} command.
@param (x0, y0): starting point in DIP
@param (x1, y1): ending point in DIP
@param duration: duration of the event in ms
@param steps: number of steps (currently ignored by @{input swipe}
'''
self.__checkTransport()
if orientation == -1:
orientation = self.display['orientation']
density = self.display['density'] if self.display['density'] > 0 else 1
x0 = x0 * density
y0 = y0 * density
x1 = x1 * density
y1 = y1 * density
self.drag((x0, y0), (x1, y1), duration, steps, orientation)
def type(self, text):
self.__checkTransport()
if type(text) is str:
escaped = text.replace('%s', '\\%s')
encoded = escaped.replace(' ', '%s')
else:
encoded = str(text);
#FIXME find out which characters can be dangerous,
# for exmaple not worst idea to escape "
self.shell(u'input text "%s"' % encoded)
def wake(self):
self.__checkTransport()
if not self.isScreenOn():
self.shell('input keyevent POWER')
def isLocked(self):
'''
Checks if the device screen is locked.
@return True if the device screen is locked
'''
self.__checkTransport()
lockScreenRE = re.compile('mShowingLockscreen=(true|false)')
m = lockScreenRE.search(self.shell('dumpsys window policy'))
if m:
return (m.group(1) == 'true')
raise RuntimeError("Couldn't determine screen lock state")
def isScreenOn(self):
'''
Checks if the screen is ON.
@return True if the device screen is ON
'''
self.__checkTransport()
screenOnRE = re.compile('mScreenOnFully=(true|false)')
m = screenOnRE.search(self.shell('dumpsys window policy'))
if m:
return (m.group(1) == 'true')
raise RuntimeError("Couldn't determine screen ON state")
def unlock(self):
'''
Unlocks the screen of the device.
'''
self.__checkTransport()
self.shell('input keyevent MENU')
self.shell('input keyevent BACK')
@staticmethod
def percentSame(image1, image2):
'''
Returns the percent of pixels that are equal
@author: catshoes
'''
# If the images differ in size, return 0% same.
size_x1, size_y1 = image1.size
size_x2, size_y2 = image2.size
if (size_x1 != size_x2 or
size_y1 != size_y2):
return 0
# Images are the same size
# Return the percent of pixels that are equal.
numPixelsSame = 0
numPixelsTotal = size_x1 * size_y1
image1Pixels = image1.load()
image2Pixels = image2.load()
# Loop over all pixels, comparing pixel in image1 to image2
for x in range(size_x1):
for y in range(size_y1):
if (image1Pixels[x, y] == image2Pixels[x, y]):
numPixelsSame += 1
return numPixelsSame / float(numPixelsTotal)
@staticmethod
def sameAs(image1, image2, percent=1.0):
'''
Compares 2 images
@author: catshoes
'''
return (AdbClient.percentSame(image1, image2) >= percent)
@staticmethod
def imageInScreen(screen, image):
'''
Checks if image is on the screen
:param screen: the screen image
:param image: the partial image to look for
:return: True or False
@author: Perry Tsai <ripple0129@gmail.com>
'''
# To make sure image smaller than screen.
size_x1, size_y1 = screen.size
size_x2, size_y2 = image.size
if size_x1 <= size_x2 or size_y1 <= size_y2:
return 0
# Load pixels.
screenPixels = screen.load()
imagePixels = image.load()
# Loop over all pixels, if pixel image[0,0] same as pixel screen[x,y] do crop and compare
for x in range(size_x1 - size_x2):
for y in range(size_y1 - size_y2):
if imagePixels[0, 0] == screenPixels[x, y]:
croppedScreen = screen.crop((x, y, x + size_x2, y + size_y2))
size_x3, size_y3 = croppedScreen.size
croppedPixels = croppedScreen.load()
for x in range(size_x3):
for y in range(size_y3):
if imagePixels[x, y] == croppedPixels[x, y]:
return True
def isKeyboardShown(self):
'''
Whether the keyboard is displayed.
'''
self.__checkTransport()
dim = self.shell('dumpsys input_method')
if dim:
# FIXME: API >= 15 ?
return "mInputShown=true" in dim
return False
def initDisplayProperties(self):
self.__checkTransport()
self.__displayInfo = None
self.display['width'] = self.getProperty('display.width')
self.display['height'] = self.getProperty('display.height')
self.display['density'] = self.getProperty('display.density')
self.display['orientation'] = self.getProperty('display.orientation')
def log(self, tag, message, priority='D', verbose=False):
if DEBUG_LOG:
print >> sys.stderr, "log(tag=%s, message=%s, priority=%s, verbose=%s)" % (tag, message, priority, verbose)
self.__checkTransport()
message = self.substituteDeviceTemplate(message)
if verbose or priority == 'V':
print >> sys.stderr, tag + ':', message
self.shell('log -p %c -t "%s" %s' % (priority, tag, message))
class __Log():
'''
Log class to simulate C{android.util.Log}
'''
def __init__(self, adbClient):
self.adbClient = adbClient
def __getattr__(self, attr):
'''
Returns the corresponding log method or @C{AttributeError}.
'''
if attr in ['v', 'd', 'i', 'w', 'e']:
return lambda tag, message, verbose: self.adbClient.log(tag, message, priority=attr.upper(),
verbose=verbose)
raise AttributeError(self.__class__.__name__ + ' has no attribute "%s"' % attr)
def getSystemService(self, name):
if name == WIFI_SERVICE:
return WifiManager(self)
def getWindows(self):
self.__checkTransport()
windows = {}
dww = self.shell('dumpsys window windows')
if DEBUG_WINDOWS: print >> sys.stderr, dww
lines = dww.splitlines()
widRE = re.compile('^ *Window #%s Window{%s (u\d+ )?%s?.*}:' %
(_nd('num'), _nh('winId'), _ns('activity', greedy=True)))
currentFocusRE = re.compile('^ mCurrentFocus=Window{%s .*' % _nh('winId'))
viewVisibilityRE = re.compile(' mViewVisibility=0x%s ' % _nh('visibility'))
# This is for 4.0.4 API-15
containingFrameRE = re.compile('^ *mContainingFrame=\[%s,%s\]\[%s,%s\] mParentFrame=\[%s,%s\]\[%s,%s\]' %
(_nd('cx'), _nd('cy'), _nd('cw'), _nd('ch'), _nd('px'), _nd('py'), _nd('pw'),
_nd('ph')))
contentFrameRE = re.compile('^ *mContentFrame=\[%s,%s\]\[%s,%s\] mVisibleFrame=\[%s,%s\]\[%s,%s\]' %
(_nd('x'), _nd('y'), _nd('w'), _nd('h'), _nd('vx'), _nd('vy'), _nd('vx1'),
_nd('vy1')))
# This is for 4.1 API-16
framesRE = re.compile('^ *Frames: containing=\[%s,%s\]\[%s,%s\] parent=\[%s,%s\]\[%s,%s\]' %
(_nd('cx'), _nd('cy'), _nd('cw'), _nd('ch'), _nd('px'), _nd('py'), _nd('pw'), _nd('ph')))
contentRE = re.compile('^ *content=\[%s,%s\]\[%s,%s\] visible=\[%s,%s\]\[%s,%s\]' %
(_nd('x'), _nd('y'), _nd('w'), _nd('h'), _nd('vx'), _nd('vy'), _nd('vx1'), _nd('vy1')))
policyVisibilityRE = re.compile('mPolicyVisibility=%s ' % _ns('policyVisibility', greedy=True))
currentFocus = None
for l in range(len(lines)):
m = widRE.search(lines[l])
if m:
num = int(m.group('num'))
winId = m.group('winId')
activity = m.group('activity')
wvx = 0
wvy = 0
wvw = 0
wvh = 0
px = 0
py = 0
visibility = -1
policyVisibility = 0x0
for l2 in range(l + 1, len(lines)):
m = widRE.search(lines[l2])
if m:
l += (l2 - 1)
break
m = viewVisibilityRE.search(lines[l2])
if m:
visibility = int(m.group('visibility'))
if DEBUG_COORDS: print >> sys.stderr, "getWindows: visibility=", visibility
if self.build[VERSION_SDK_PROPERTY] >= 17:
wvx, wvy = (0, 0)
wvw, wvh = (0, 0)
if self.build[VERSION_SDK_PROPERTY] >= 16:
m = framesRE.search(lines[l2])
if m:
px, py = obtainPxPy(m)
m = contentRE.search(lines[l2 + 1])
if m:
# FIXME: the information provided by 'dumpsys window windows' in 4.2.1 (API 16)
# when there's a system dialog may not be correct and causes the View coordinates
# be offset by this amount, see
# https://github.com/dtmilano/AndroidViewClient/issues/29
wvx, wvy = obtainVxVy(m)
wvw, wvh = obtainVwVh(m)
elif self.build[VERSION_SDK_PROPERTY] == 15:
m = containingFrameRE.search(lines[l2])
if m:
px, py = obtainPxPy(m)
m = contentFrameRE.search(lines[l2 + 1])
if m:
wvx, wvy = obtainVxVy(m)
wvw, wvh = obtainVwVh(m)
elif self.build[VERSION_SDK_PROPERTY] == 10:
m = containingFrameRE.search(lines[l2])
if m:
px, py = obtainPxPy(m)
m = contentFrameRE.search(lines[l2 + 1])
if m:
wvx, wvy = obtainVxVy(m)
wvw, wvh = obtainVwVh(m)
else:
warnings.warn("Unsupported Android version %d" % self.build[VERSION_SDK_PROPERTY])
# print >> sys.stderr, "Searching policyVisibility in", lines[l2]
m = policyVisibilityRE.search(lines[l2])
if m:
policyVisibility = 0x0 if m.group('policyVisibility') == 'true' else 0x8
windows[winId] = Window(num, winId, activity, wvx, wvy, wvw, wvh, px, py, visibility + policyVisibility)
else:
m = currentFocusRE.search(lines[l])
if m:
currentFocus = m.group('winId')
if currentFocus in windows and windows[currentFocus].visibility == 0:
if DEBUG_COORDS:
print >> sys.stderr, "getWindows: focus=", currentFocus
print >> sys.stderr, "getWindows:", windows[currentFocus]
windows[currentFocus].focused = True
return windows
def getFocusedWindow(self):
'''
Gets the focused window.
@return: The focused L{Window}.
'''
for window in self.getWindows().values():
if window.focused:
return window
return None
def getFocusedWindowName(self):
'''
Gets the focused window name.
This is much like monkeyRunner's C{HierarchyView.getWindowName()}
@return: The focused window name
'''
window = self.getFocusedWindow()
if window:
return window.activity
return None
def getTopActivityNameAndPid(self):
dat = self.shell('dumpsys activity top')
lines = dat.splitlines()
activityRE = re.compile('\s*ACTIVITY ([A-Za-z0-9_.]+)/([A-Za-z0-9_.]+) \w+ pid=(\d+)')
m = activityRE.search(lines[1])
if m:
return (m.group(1), m.group(2), m.group(3))
else:
warnings.warn("NO MATCH:" + lines[1])
return None
def getTopActivityName(self):
tanp = self.getTopActivityNameAndPid()
if tanp:
return tanp[0] + '/' + tanp[1]
else:
return None
def substituteDeviceTemplate(self, template):
serialno = self.serialno.replace('.', '_').replace(':', '-')
focusedWindowName = self.getFocusedWindowName().replace('/', '-').replace('.', '_')
timestamp = datetime.datetime.now().isoformat()
osName = platform.system()
if osName.startswith('Windows'): # ':' not supported in filenames
timestamp.replace(':', '_')
_map = {
'serialno': serialno,
'focusedwindowname': focusedWindowName,
'timestamp': timestamp
}
return string.Template(template).substitute(_map)
if __name__ == '__main__':
adbClient = AdbClient(os.environ['ANDROID_SERIAL'])
INTERACTIVE = False
if INTERACTIVE:
sout = adbClient.shell()
prompt = re.compile(".+@android:(.*) [$#] \r\r\n")
while True:
try:
cmd = raw_input('adb $ ')
except EOFError:
break
if cmd == 'exit':
break
adbClient.socket.__send(cmd + "\r\n")
sout.readline(4096) # eat first line, which is the command
while True:
line = sout.readline(4096)
if prompt.match(line):
break
print line,
if not line:
break
print "\nBye"
else:
print 'date:', adbClient.shell('date')