Commit 7b2cf724 authored by tengs's avatar tengs Committed by Commit bot

Add end-to-end testing tool for Smart Lock.

This tool is based on telemetry and exercises the Smart Lock setup flow. This CL
also introduces the framework that will allow implementing other end-to-end
tests.

BUG=chromium:442728
NOTRY=true

Review URL: https://codereview.chromium.org/1004283002

Cr-Commit-Position: refs/heads/master@{#322604}
parent 4f788f02
This diff is collapsed.
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import httplib
import json
import logging
import pprint
import time
logger = logging.getLogger('proximity_auth.%s' % __name__)
_GOOGLE_APIS_URL = 'www.googleapis.com'
_REQUEST_PATH = '/cryptauth/v1/%s?alt=JSON'
class CryptAuthClient(object):
""" A client for making blocking CryptAuth API calls. """
def __init__(self, access_token, google_apis_url=_GOOGLE_APIS_URL):
self._access_token = access_token
self._google_apis_url = google_apis_url
def GetMyDevices(self):
""" Invokes the GetMyDevices API.
Returns:
A list of devices or None if the API call fails.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
request_data = {
'approvedForUnlockRequired': False,
'allowStaleRead': False,
'invocationReason': 13 # REASON_MANUAL
}
response = self._SendRequest('deviceSync/getmydevices', request_data)
return response['devices'] if response is not None else None
def GetUnlockKey(self):
"""
Returns:
The unlock key registered with CryptAuth if it exists or None.
The device is a dictionary of the deserialized JSON returned by CryptAuth.
"""
devices = self.GetMyDevices()
if devices is None:
return None
for device in devices:
if device['unlockKey']:
return device
return None
def ToggleEasyUnlock(self, enable, public_key=''):
""" Calls the ToggleEasyUnlock API.
Args:
enable: True to designate the device specified by |public_key| as an
unlock key.
public_key: The public key of the device to toggle. Ignored if |enable| is
False, which toggles all unlock keys off.
Returns:
True upon success, else False.
"""
request_data = { 'enable': enable, }
if not enable:
request_data['applyToAll'] = True
else:
request_data['publicKey'] = public_key
response = self._SendRequest('deviceSync/toggleeasyunlock', request_data)
return response is not None
def FindEligibleUnlockDevices(self, time_delta_millis=None):
""" Finds devices eligible to be an unlock key.
Args:
time_delta_millis: If specified, then only return eligible devices that
have contacted CryptAuth in the last time delta.
Returns:
A tuple containing two lists, one of eligible devices and the other of
ineligible devices.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
request_data = {}
if time_delta_millis is not None:
request_data['maxLastUpdateTimeDeltaMillis'] = time_delta_millis * 1000;
response = self._SendRequest(
'deviceSync/findeligibleunlockdevices', request_data)
if response is None:
return None
eligibleDevices = (
response['eligibleDevices'] if 'eligibleDevices' in response else [])
ineligibleDevices = (
response['ineligibleDevices'] if (
'ineligibleDevices' in response) else [])
return eligibleDevices, ineligibleDevices
def PingPhones(self, timeout_secs=10):
""" Asks CryptAuth to ping registered phones and determine their status.
Args:
timeout_secs: The number of seconds to wait for phones to respond.
Returns:
A tuple containing two lists, one of eligible devices and the other of
ineligible devices.
Each device is a dictionary of the deserialized JSON returned by
CryptAuth.
"""
response = self._SendRequest(
'deviceSync/senddevicesynctickle',
{ 'tickleType': 'updateEnrollment' })
if response is None:
return None
# We wait for phones to update their status with CryptAuth.
logger.info('Waiting for %s seconds for phone status...' % timeout_secs)
time.sleep(timeout_secs)
return self.FindEligibleUnlockDevices(time_delta_millis=timeout_secs)
def _SendRequest(self, function_path, request_data):
""" Sends an HTTP request to CryptAuth and returns the deserialized
response.
"""
conn = httplib.HTTPSConnection(self._google_apis_url)
path = _REQUEST_PATH % function_path
headers = {
'authorization': 'Bearer ' + self._access_token,
'Content-Type': 'application/json'
}
body = json.dumps(request_data)
logger.info('Making request to %s with body:\n%s' % (
path, pprint.pformat(request_data)))
conn.request('POST', path, body, headers)
response = conn.getresponse()
if response.status == 204:
return {}
if response.status != 200:
logger.warning('Request to %s failed: %s' % (path, response.status))
return None
return json.loads(response.read())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Script that exercises the Smart Lock setup flow, testing that a nearby phone
can be found and used to unlock a Chromebook.
Note: This script does not currently automate Android phones, so make sure that
a phone is properly configured and online before starting the test.
Usage:
python setup_test.py --remote_address REMOTE_ADDRESS
--username USERNAME
--password PASSWORD
[--app_path APP_PATH]
[--ssh_port SSH_PORT]
If |--app_path| is provided, then a copy of the Smart Lock app on the local
machine will be used instead of the app on the ChromeOS device.
"""
import argparse
import cros
import cryptauth
import logging
import os
import subprocess
import sys
import tempfile
logger = logging.getLogger('proximity_auth.%s' % __name__)
class SmartLockSetupError(Exception):
pass
def pingable_address(address):
try:
subprocess.check_output(['ping', '-c', '1', '-W', '1', address])
except subprocess.CalledProcessError:
raise argparse.ArgumentError('%s cannot be reached.' % address)
return address
def email(arg):
tokens = arg.lower().split('@')
if len(tokens) != 2 or '.' not in tokens[1]:
raise argparse.ArgumentError('%s is not a valid email address' % arg)
name, domain = tokens
if domain == 'gmail.com':
name = name.replace('.', '')
return '@'.join([name, domain])
def directory(path):
if not os.path.isdir(path):
raise argparse.ArgumentError('%s is not a directory' % path)
return path
def ParseArgs():
parser = argparse.ArgumentParser(prog='python setup_test.py')
parser.add_argument('--remote_address', required=True, type=pingable_address)
parser.add_argument('--username', required=True, type=email)
parser.add_argument('--password', required=True)
parser.add_argument('--ssh_port', type=int)
parser.add_argument('--app_path', type=directory)
args = parser.parse_args()
return args
def CheckCryptAuthState(access_token):
cryptauth_client = cryptauth.CryptAuthClient(access_token)
# Check if we can make CryptAuth requests.
if cryptauth_client.GetMyDevices() is None:
logger.error('Cannot reach CryptAuth on test machine.')
return False
if cryptauth_client.GetUnlockKey() is not None:
logger.info('Smart Lock currently enabled, turning off on Cryptauth...')
if not cryptauth_client.ToggleEasyUnlock(False):
logger.error('ToggleEasyUnlock request failed.')
return False
result = cryptauth_client.FindEligibleUnlockDevices()
if result is None:
logger.error('FindEligibleUnlockDevices request failed')
return False
eligibleDevices, _ = result
if len(eligibleDevices) == 0:
logger.warn('No eligible phones found, trying to ping phones...')
result = cryptauth_client.PingPhones()
if result is None or not len(result[0]):
logger.error('Pinging phones failed :(')
return False
else:
logger.info('Pinging phones succeeded!')
else:
logger.info('Found eligible device: %s' % (
eligibleDevices[0]['friendlyDeviceName']))
return True
def _NavigateSetupDialog(chromeos, app):
logger.info('Scanning for nearby phones...')
btmon = chromeos.RunBtmon()
find_phone_success = app.FindPhone()
btmon.terminate()
if not find_phone_success:
fd, filepath = tempfile.mkstemp(prefix='btmon-')
os.write(fd, btmon.stdout.read())
os.close(fd)
logger.info('Logs for btmon can be found at %s' % filepath)
raise SmartLockSetupError("Failed to find nearby phone.")
logger.info('Phone found! Starting pairing...')
if not app.PairPhone():
raise SmartLockSetupError("Failed to pair with phone.")
logger.info('Pairing success! Starting trial run...')
app.StartTrialRun()
logger.info('Unlocking for trial run...')
lock_screen = chromeos.GetAccountPickerScreen()
lock_screen.WaitForSmartLockState(
lock_screen.SmartLockState.AUTHENTICATED)
lock_screen.UnlockWithClick()
logger.info('Trial run success! Dismissing app...')
app.DismissApp()
def RunSetupTest(args):
logger.info('Starting test for %s at %s' % (
args.username, args.remote_address))
if args.app_path is not None:
logger.info('Replacing Smart Lock app with %s' % args.app_path)
chromeos = cros.ChromeOS(
args.remote_address, args.username, args.password, ssh_port=args.ssh_port)
with chromeos.Start(local_app_path=args.app_path):
logger.info('Chrome initialized')
# TODO(tengs): The access token is currently fetched from the Smart Lock
# app's background page. To be more robust, we should instead mint the
# access token ourselves.
if not CheckCryptAuthState(chromeos.cryptauth_access_token):
raise SmartLockSetupError('Failed to check CryptAuth state')
logger.info('Opening Smart Lock settings...')
settings = chromeos.GetSmartLockSettings()
assert(not settings.is_smart_lock_enabled)
logger.info('Starting Smart Lock setup flow...')
app = settings.StartSetupAndReturnApp()
_NavigateSetupDialog(chromeos, app)
def main():
logging.basicConfig()
logging.getLogger('proximity_auth').setLevel(logging.INFO)
args = ParseArgs()
RunSetupTest(args)
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment