Commit ab18bcca authored by perezju's avatar perezju Committed by Commit bot

New run shell implementation for DeviceUtils

The main differences are:
- it uses AdbWrapper.Shell to actually execute the command.
- when the cmd is supplied as a list of a command and its arguments,
  the arguments are quoted to prevent them from being (mis)interpreted
  by the shell.
- a new single_line option to check that the output produces contains a single line, and return the value of that line.

BUG=267773

Review URL: https://codereview.chromium.org/659533002

Cr-Commit-Position: refs/heads/master@{#300237}
parent c2ff5667
......@@ -9,6 +9,7 @@ import os
import pipes
import select
import signal
import string
import StringIO
import subprocess
import time
......@@ -19,6 +20,52 @@ try:
except ImportError:
fcntl = None
_SafeShellChars = frozenset(string.ascii_letters + string.digits + '@%_-+=:,./')
def SingleQuote(s):
"""Return an shell-escaped version of the string using single quotes.
Reliably quote a string which may contain unsafe characters (e.g. space,
quote, or other special characters such as '$').
The returned value can be used in a shell command line as one token that gets
to be interpreted literally.
Args:
s: The string to quote.
Return:
The string quoted using single quotes.
"""
return pipes.quote(s)
def DoubleQuote(s):
"""Return an shell-escaped version of the string using double quotes.
Reliably quote a string which may contain unsafe characters (e.g. space
or quote characters), while retaining some shell features such as variable
interpolation.
The returned value can be used in a shell command line as one token that gets
to be further interpreted by the shell.
The set of characters that retain their special meaning may depend on the
shell implementation. This set usually includes: '$', '`', '\', '!', '*',
and '@'.
Args:
s: The string to quote.
Return:
The string quoted using double quotes.
"""
if not s:
return '""'
elif all(c in _SafeShellChars for c in s):
return s
else:
return '"' + s.replace('"', '\\"') + '"'
def Popen(args, stdout=None, stderr=None, shell=None, cwd=None, env=None):
return subprocess.Popen(
......@@ -88,7 +135,7 @@ def GetCmdStatusAndOutput(args, cwd=None, shell=False):
elif shell:
raise Exception('array args must be run with shell=False')
else:
args_repr = ' '.join(map(pipes.quote, args))
args_repr = ' '.join(map(SingleQuote, args))
s = '[host]'
if cwd:
......
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for the cmd_helper module."""
import unittest
from pylib import cmd_helper
# TODO(jbudorick) Make these tests run on the bots.
class CmdHelperSingleQuoteTest(unittest.TestCase):
def testSingleQuote_basic(self):
self.assertEquals('hello',
cmd_helper.SingleQuote('hello'))
def testSingleQuote_withSpaces(self):
self.assertEquals("'hello world'",
cmd_helper.SingleQuote('hello world'))
def testSingleQuote_withUnsafeChars(self):
self.assertEquals("""'hello'"'"'; rm -rf /'""",
cmd_helper.SingleQuote("hello'; rm -rf /"))
def testSingleQuote_dontExpand(self):
test_string = 'hello $TEST_VAR'
cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
self.assertEquals(test_string,
cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
class CmdHelperDoubleQuoteTest(unittest.TestCase):
def testDoubleQuote_basic(self):
self.assertEquals('hello',
cmd_helper.DoubleQuote('hello'))
def testDoubleQuote_withSpaces(self):
self.assertEquals('"hello world"',
cmd_helper.DoubleQuote('hello world'))
def testDoubleQuote_withUnsafeChars(self):
self.assertEquals('''"hello\\"; rm -rf /"''',
cmd_helper.DoubleQuote('hello"; rm -rf /'))
def testSingleQuote_doExpand(self):
test_string = 'hello $TEST_VAR'
cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
self.assertEquals('hello world',
cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
......@@ -176,13 +176,11 @@ class AdbWrapper(object):
['shell', actual_command], timeout, retries, check_error=False)
if expect_rc is not None:
output_end = output.rstrip().rfind('\n') + 1
rc = output[output_end:].strip()
rc = int(output[output_end:].strip())
output = output[:output_end]
if int(rc) != expect_rc:
raise device_errors.AdbCommandFailedError(
['shell', command],
'shell command exited with code: %s' % rc,
self._device_serial)
if rc != expect_rc:
raise device_errors.AdbShellCommandFailedError(
command, rc, output, self._device_serial)
return output
def Logcat(self, filter_spec=None, timeout=_DEFAULT_TIMEOUT,
......
......@@ -41,8 +41,8 @@ class TestAdbWrapper(unittest.TestCase):
self.assertEqual(output.strip(), 'test')
output = self._adb.Shell('echo test')
self.assertEqual(output.strip(), 'test')
self.assertRaises(device_errors.AdbCommandFailedError, self._adb.Shell,
'echo test', expect_rc=1)
self.assertRaises(device_errors.AdbShellCommandFailedError,
self._adb.Shell, 'echo test', expect_rc=1)
def testPushPull(self):
path = self._MakeTempFile('foo')
......
......@@ -24,10 +24,23 @@ class AdbCommandFailedError(CommandFailedError):
def __init__(self, cmd, msg, device=None):
super(AdbCommandFailedError, self).__init__(
'adb command \'%s\' failed with message: \'%s\'' % (' '.join(cmd), msg),
'adb command %r failed with message: %s' % (' '.join(cmd), msg),
device=device)
class AdbShellCommandFailedError(AdbCommandFailedError):
"""Exception for adb shell command failing with non-zero return code."""
def __init__(self, cmd, return_code, output, device=None):
super(AdbShellCommandFailedError, self).__init__(
['shell'],
'command %r on device failed with return code %d and output %r'
% (cmd, return_code, output),
device=device)
self.return_code = return_code
self.output = output
class CommandTimeoutError(BaseError):
"""Exception for command timeouts."""
pass
......
......@@ -11,13 +11,14 @@ Eventually, this will be based on adb_wrapper.
import logging
import multiprocessing
import os
import pipes
import re
import sys
import tempfile
import time
import zipfile
import pylib.android_commands
from pylib import cmd_helper
from pylib.device import adb_wrapper
from pylib.device import decorators
from pylib.device import device_errors
......@@ -54,6 +55,8 @@ def RestartServer():
class DeviceUtils(object):
_VALID_SHELL_VARIABLE = re.compile('^[a-zA-Z_][a-zA-Z0-9_]*$')
def __init__(self, device, default_timeout=_DEFAULT_TIMEOUT,
default_retries=_DEFAULT_RETRIES):
"""DeviceUtils constructor.
......@@ -87,6 +90,7 @@ class DeviceUtils(object):
self._commands_installed = None
self._default_timeout = default_timeout
self._default_retries = default_retries
self._cache = {}
assert(hasattr(self, decorators.DEFAULT_TIMEOUT_ATTR))
assert(hasattr(self, decorators.DEFAULT_RETRIES_ATTR))
......@@ -127,7 +131,11 @@ class DeviceUtils(object):
return self._HasRootImpl()
def _HasRootImpl(self):
return self.old_interface.IsRootEnabled()
try:
self._RunShellCommandImpl('ls /root', check_return=True)
return True
except device_errors.AdbShellCommandFailedError:
return False
@decorators.WithTimeoutAndRetriesFromInstance()
def EnableRoot(self, timeout=None, retries=None):
......@@ -182,11 +190,17 @@ class DeviceUtils(object):
return self._GetExternalStoragePathImpl()
def _GetExternalStoragePathImpl(self):
try:
return self.old_interface.GetExternalStorage()
except AssertionError as e:
raise device_errors.CommandFailedError(
str(e), device=str(self)), None, sys.exc_info()[2]
if 'external_storage' in self._cache:
return self._cache['external_storage']
value = self._RunShellCommandImpl('echo $EXTERNAL_STORAGE',
single_line=True,
check_return=True)
if not value:
raise device_errors.CommandFailedError('$EXTERNAL_STORAGE is not set',
str(self))
self._cache['external_storage'] = value
return value
@decorators.WithTimeoutAndRetriesFromInstance()
def WaitUntilFullyBooted(self, wifi=False, timeout=None, retries=None):
......@@ -216,7 +230,7 @@ class DeviceUtils(object):
self.old_interface.WaitForSdCardReady(timeout)
if wifi:
while not 'Wi-Fi is enabled' in (
self._RunShellCommandImpl('dumpsys wifi')):
self.old_interface.RunShellCommand('dumpsys wifi')):
time.sleep(1)
REBOOT_DEFAULT_TIMEOUT = 10 * _DEFAULT_TIMEOUT
......@@ -292,57 +306,95 @@ class DeviceUtils(object):
str(e), device=str(self)), None, sys.exc_info()[2]
@decorators.WithTimeoutAndRetriesFromInstance()
def RunShellCommand(self, cmd, check_return=False, as_root=False, cwd=None,
env=None, timeout=None, retries=None):
def RunShellCommand(self, cmd, check_return=False, cwd=None, env=None,
as_root=False, single_line=False,
timeout=None, retries=None):
"""Run an ADB shell command.
TODO(jbudorick) Switch the default value of check_return to True after
AndroidCommands is gone.
The command to run |cmd| should be a sequence of program arguments or else
a single string.
When |cmd| is a sequence, it is assumed to contain the name of the command
to run followed by its arguments. In this case, arguments are passed to the
command exactly as given, without any further processing by the shell. This
allows to easily pass arguments containing spaces or special characters
without having to worry about getting quoting right. Whenever possible, it
is recomended to pass |cmd| as a sequence.
When |cmd| is given as a string, it will be interpreted and run by the
shell on the device.
This behaviour is consistent with that of command runners in cmd_helper as
well as Python's own subprocess.Popen.
TODO(perezju) Change the default of |check_return| to True when callers
have switched to the new behaviour.
Args:
cmd: A list containing the command to run on the device and any arguments.
cmd: A string with the full command to run on the device, or a sequence
containing the command and its arguments.
check_return: A boolean indicating whether or not the return code should
be checked.
as_root: A boolean indicating whether the shell command should be run
with root privileges.
cwd: The device directory in which the command should be run.
env: The environment variables with which the command should be run.
as_root: A boolean indicating whether the shell command should be run
with root privileges.
single_line: A boolean indicating if a single line of output is expected,
and the caller wants to retrieve the value of that line. The default
behaviour is to return a list of output lines.
timeout: timeout in seconds
retries: number of retries
Returns:
The output of the command.
The output of the command either as list of lines or, when single_line is
True, the value contained in the single expected line of output.
Raises:
CommandFailedError if check_return is True and the return code is nozero.
AdbShellCommandFailedError if check_return is True and the exit code of
the command run on the device is non-zero.
CommandFailedError if single_line is True but the output consists of
either zero or more than one lines.
CommandTimeoutError on timeout.
DeviceUnreachableError on missing device.
"""
return self._RunShellCommandImpl(
cmd, check_return=check_return, as_root=as_root, cwd=cwd, env=env,
timeout=timeout)
def _RunShellCommandImpl(self, cmd, check_return=False, as_root=False,
cwd=None, env=None, timeout=None):
# TODO(jbudorick): Remove the timeout parameter once this is no longer
# backed by AndroidCommands.
if isinstance(cmd, list):
cmd = ' '.join(cmd)
return self._RunShellCommandImpl(cmd, check_return=check_return, cwd=cwd,
env=env, as_root=as_root, single_line=single_line)
def _RunShellCommandImpl(self, cmd, check_return=False, cwd=None, env=None,
as_root=False, single_line=False):
def env_quote(key, value):
if not DeviceUtils._VALID_SHELL_VARIABLE.match(key):
raise KeyError('Invalid shell variable name %r' % key)
# using double quotes here to allow interpolation of shell variables
return '%s=%s' % (key, cmd_helper.DoubleQuote(value))
if not isinstance(cmd, basestring):
cmd = ' '.join(cmd_helper.SingleQuote(s) for s in cmd)
if as_root and not self._HasRootImpl():
cmd = 'su -c %s' % cmd
if env:
cmd = '%s %s' % (
' '.join('%s=%s' % (k, v) for k, v in env.iteritems()), cmd)
env = ' '.join(env_quote(k, v) for k, v in env.iteritems())
cmd = '%s %s' % (env, cmd)
if cwd:
cmd = 'cd %s && %s' % (cwd, cmd)
cmd = 'cd %s && %s' % (cmd_helper.SingleQuote(cwd), cmd)
try:
# TODO(perezju) still need to make sure that we call a version of
# adb.Shell without a timeout-and-retries wrapper.
output = self.adb.Shell(cmd, expect_rc=0)
except device_errors.AdbShellCommandFailedError as e:
if check_return:
code, output = self.old_interface.GetShellCommandStatusAndOutput(
cmd, timeout_time=timeout)
if int(code) != 0:
raise device_errors.AdbCommandFailedError(
cmd.split(), 'Nonzero exit code (%d)' % code, device=str(self))
raise
else:
output = e.output
output = output.splitlines()
if single_line:
if len(output) != 1:
msg = 'exactly one line of output expected, but got: %s'
raise device_errors.CommandFailedError(msg % output)
return output[0]
else:
output = self.old_interface.RunShellCommand(cmd, timeout_time=timeout)
return output
@decorators.WithTimeoutAndRetriesFromInstance()
......@@ -371,8 +423,8 @@ class DeviceUtils(object):
raise device_errors.CommandFailedError(
'No process "%s"' % process_name, device=str(self))
cmd = 'kill -%d %s' % (signum, ' '.join(pids.values()))
self._RunShellCommandImpl(cmd, as_root=as_root)
cmd = ['kill', '-%d' % signum] + pids.values()
self._RunShellCommandImpl(cmd, as_root=as_root, check_return=True)
if blocking:
wait_period = 0.1
......@@ -523,8 +575,7 @@ class DeviceUtils(object):
files = []
for h, d in host_device_tuples:
if os.path.isdir(h):
self._RunShellCommandImpl(['mkdir', '-p', '"%s"' % d],
check_return=True)
self._RunShellCommandImpl(['mkdir', '-p', d], check_return=True)
files += self._GetChangedFilesImpl(h, d)
if not files:
......@@ -558,14 +609,13 @@ class DeviceUtils(object):
self._PushChangedFilesZipped(files)
self._RunShellCommandImpl(
['chmod', '-R', '777'] + [d for _, d in host_device_tuples],
as_root=True)
as_root=True, check_return=True)
def _GetChangedFilesImpl(self, host_path, device_path):
real_host_path = os.path.realpath(host_path)
try:
real_device_path = self._RunShellCommandImpl(
['realpath', device_path], check_return=True)
real_device_path = real_device_path[0]
['realpath', device_path], single_line=True, check_return=True)
except device_errors.CommandFailedError:
return [(host_path, device_path)]
......@@ -657,13 +707,14 @@ class DeviceUtils(object):
self.adb.Push(zip_file.name, zip_on_device)
self._RunShellCommandImpl(
['unzip', zip_on_device],
as_root=True, check_return=True,
env={'PATH': '$PATH:%s' % install_commands.BIN_DIR})
as_root=True,
env={'PATH': '$PATH:%s' % install_commands.BIN_DIR},
check_return=True)
finally:
if zip_proc.is_alive():
zip_proc.terminate()
if self._IsOnlineImpl():
self._RunShellCommandImpl(['rm', zip_on_device])
self._RunShellCommandImpl(['rm', zip_on_device], check_return=True)
@staticmethod
def _CreateDeviceZip(zip_path, host_device_tuples):
......@@ -799,8 +850,9 @@ class DeviceUtils(object):
CommandTimeoutError on timeout.
DeviceUnreachableError on missing device.
"""
self._RunShellCommandImpl('echo {1} > {0}'.format(device_path,
pipes.quote(text)), check_return=True, as_root=as_root)
cmd = 'echo %s > %s' % (cmd_helper.SingleQuote(text),
cmd_helper.SingleQuote(device_path))
self._RunShellCommandImpl(cmd, as_root=as_root, check_return=True)
@decorators.WithTimeoutAndRetriesFromInstance()
def Ls(self, device_path, timeout=None, retries=None):
......@@ -917,7 +969,7 @@ class DeviceUtils(object):
def _GetPidsImpl(self, process_name):
procs_pids = {}
for line in self._RunShellCommandImpl('ps'):
for line in self._RunShellCommandImpl('ps', check_return=True):
try:
ps_data = line.split()
if process_name in ps_data[-1]:
......
......@@ -218,6 +218,73 @@ class DeviceUtilsOldImplTest(unittest.TestCase):
'0123456789abcdef', default_timeout=1, default_retries=0)
class Args:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def __eq__(self, other):
return (self.args, self.kwargs) == (other.args, other.kwargs)
def __repr__(self):
return '%s(%s)' % (type(self).__name__, str(self))
def __str__(self):
toks = (['%r' % v for v in self.args] +
['%s=%r' % (k, self.kwargs[k]) for k in sorted(self.kwargs)])
return ', '.join(toks)
class MockCallSequence(object):
def __init__(self, test_case, obj, method, calls):
def assert_and_return(*args, **kwargs):
received_args = Args(*args, **kwargs)
test_case.assertTrue(
self._calls,
msg=('Unexpected call\n'
' received: %s(%s)\n' % (self._method, received_args)))
expected_args, return_value = self._calls.pop(0)
test_case.assertTrue(
received_args == expected_args,
msg=('Call does not match expected args\n'
' received: %s(%s)\n'
' expected: %s(%s)\n'
% (self._method, received_args,
self._method, expected_args)))
if isinstance(return_value, Exception):
raise return_value
else:
return return_value
self._calls = list(calls)
self._test_case = test_case
self._method = method
self._patched = mock.patch.object(obj, self._method,
side_effect=assert_and_return)
def __enter__(self):
return self._patched.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self._patched.__exit__(exc_type, exc_val, exc_tb)
if exc_type is None:
missing = ''.join(' expected: %s(%s)\n'
% (self._method, expected_args)
for expected_args, _ in self._calls)
self._test_case.assertTrue(
not missing,
msg=('Expected calls not found\n' + missing))
class _ShellError:
def __init__(self, output=None, return_code=1):
if output is None:
self.output = 'Permission denied\r\n'
else:
self.output = output
self.return_code = return_code
class DeviceUtilsNewImplTest(unittest.TestCase):
def setUp(self):
......@@ -228,6 +295,28 @@ class DeviceUtilsNewImplTest(unittest.TestCase):
self.device = device_utils.DeviceUtils(
self.adb, default_timeout=1, default_retries=0)
def assertShellCallSequence(self, calls):
'''Assert that we expect a sequence of calls to adb.Shell.
Args:
calls: a sequence of (cmd, return_value) pairs, where |cmd| is the
expected shell command to run on the device (with any quoting already
applied), and |return_value| is either a string to give as mock output
or a _ShellError object to raise an AdbShellCommandFailedError.
'''
def mk_expected_call(cmd, return_value):
expected_args = Args(cmd, expect_rc=0)
if isinstance(return_value, _ShellError):
return_value = device_errors.AdbShellCommandFailedError(cmd,
return_value.return_code, return_value.output, str(self.device))
return (expected_args, return_value)
expected_calls = (mk_expected_call(a, r) for a, r in calls)
return MockCallSequence(self, self.adb, 'Shell', expected_calls)
def assertShellCall(self, cmd, return_value=''):
return self.assertShellCallSequence([(cmd, return_value)])
class DeviceUtilsHybridImplTest(DeviceUtilsOldImplTest):
......@@ -248,16 +337,14 @@ class DeviceUtilsIsOnlineTest(DeviceUtilsOldImplTest):
self.assertFalse(self.device.IsOnline())
class DeviceUtilsHasRootTest(DeviceUtilsOldImplTest):
class DeviceUtilsHasRootTest(DeviceUtilsNewImplTest):
def testHasRoot_true(self):
with self.assertCalls("adb -s 0123456789abcdef shell 'ls /root'",
'foo\r\n'):
with self.assertShellCall('ls /root', 'foo\r\n'):
self.assertTrue(self.device.HasRoot())
def testHasRoot_false(self):
with self.assertCalls("adb -s 0123456789abcdef shell 'ls /root'",
'Permission denied\r\n'):
with self.assertShellCall('ls /root', _ShellError()):
self.assertFalse(self.device.HasRoot())
......@@ -303,19 +390,17 @@ class DeviceUtilsIsUserBuildTest(DeviceUtilsOldImplTest):
self.assertFalse(self.device.IsUserBuild())
class DeviceUtilsGetExternalStoragePathTest(DeviceUtilsOldImplTest):
class DeviceUtilsGetExternalStoragePathTest(DeviceUtilsNewImplTest):
def testGetExternalStoragePath_succeeds(self):
fakeStoragePath = '/fake/storage/path'
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo $EXTERNAL_STORAGE'",
with self.assertShellCall('echo $EXTERNAL_STORAGE',
'%s\r\n' % fakeStoragePath):
self.assertEquals(fakeStoragePath,
self.device.GetExternalStoragePath())
def testGetExternalStoragePath_fails(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo $EXTERNAL_STORAGE'", '\r\n'):
with self.assertShellCall('echo $EXTERNAL_STORAGE', '\r\n'):
with self.assertRaises(device_errors.CommandFailedError):
self.device.GetExternalStoragePath()
......@@ -544,101 +629,156 @@ class DeviceUtilsInstallTest(DeviceUtilsOldImplTest):
self.device.Install('/fake/test/app.apk', retries=0)
class DeviceUtilsRunShellCommandTest(DeviceUtilsOldImplTest):
class DeviceUtilsRunShellCommandTest(DeviceUtilsNewImplTest):
def testRunShellCommand_commandAsList(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'pm list packages'",
'pacakge:android\r\n'):
with self.assertShellCall('pm list packages'):
self.device.RunShellCommand(['pm', 'list', 'packages'])
def testRunShellCommand_commandAsListQuoted(self):
with self.assertShellCall("echo 'hello world' '$10'"):
self.device.RunShellCommand(['echo', 'hello world', '$10'])
def testRunShellCommand_commandAsString(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'dumpsys wifi'",
'Wi-Fi is enabled\r\n'):
self.device.RunShellCommand('dumpsys wifi')
with self.assertShellCall('echo "$VAR"'):
self.device.RunShellCommand('echo "$VAR"')
def testNewRunShellImpl_withEnv(self):
with self.assertShellCall('VAR=some_string echo "$VAR"'):
self.device.RunShellCommand('echo "$VAR"', env={'VAR': 'some_string'})
def testNewRunShellImpl_withEnvQuoted(self):
with self.assertShellCall('PATH="$PATH:/other/path" run_this'):
self.device.RunShellCommand('run_this', env={'PATH': '$PATH:/other/path'})
def testNewRunShellImpl_withEnv_failure(self):
with self.assertRaises(KeyError):
self.device.RunShellCommand('some_cmd', env={'INVALID NAME': 'value'})
def testNewRunShellImpl_withCwd(self):
with self.assertShellCall('cd /some/test/path && ls'):
self.device.RunShellCommand('ls', cwd='/some/test/path')
def testNewRunShellImpl_withCwdQuoted(self):
with self.assertShellCall("cd '/some test/path with/spaces' && ls"):
self.device.RunShellCommand('ls', cwd='/some test/path with/spaces')
def testRunShellCommand_withSu(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ls /root'", 'Permission denied\r\n'),
("adb -s 0123456789abcdef shell 'su -c setprop service.adb.root 0'",
'')]):
with self.assertShellCallSequence([
('ls /root', _ShellError()),
('su -c setprop service.adb.root 0', '')]):
self.device.RunShellCommand('setprop service.adb.root 0', as_root=True)
def testRunShellCommand_withRoot(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ls /root'", 'hello\r\nworld\r\n'),
("adb -s 0123456789abcdef shell 'setprop service.adb.root 0'", '')]):
with self.assertShellCallSequence([
('ls /root', '\r\n'),
('setprop service.adb.root 0', '')]):
self.device.RunShellCommand('setprop service.adb.root 0', as_root=True)
def testRunShellCommand_manyLines(self):
cmd = 'ls /some/path'
with self.assertShellCall(cmd, 'file1\r\nfile2\r\nfile3\r\n'):
self.assertEquals(['file1', 'file2', 'file3'],
self.device.RunShellCommand(cmd))
def testRunShellCommand_singleLine_success(self):
cmd = 'echo $VALUE'
with self.assertShellCall(cmd, 'some value\r\n'):
self.assertEquals('some value',
self.device.RunShellCommand(cmd, single_line=True))
def testRunShellCommand_singleLine_successEmptyLine(self):
cmd = 'echo $VALUE'
with self.assertShellCall(cmd, '\r\n'):
self.assertEquals('',
self.device.RunShellCommand(cmd, single_line=True))
def testRunShellCommand_singleLine_successWithoutEndLine(self):
cmd = 'echo -n $VALUE'
with self.assertShellCall(cmd, 'some value'):
self.assertEquals('some value',
self.device.RunShellCommand(cmd, single_line=True))
def testRunShellCommand_singleLine_failNoLines(self):
cmd = 'echo $VALUE'
with self.assertShellCall(cmd, ''):
with self.assertRaises(device_errors.CommandFailedError):
self.device.RunShellCommand(cmd, single_line=True)
def testRunShellCommand_singleLine_failTooManyLines(self):
cmd = 'echo $VALUE'
with self.assertShellCall(cmd, 'some value\r\nanother value\r\n'):
with self.assertRaises(device_errors.CommandFailedError):
self.device.RunShellCommand(cmd, single_line=True)
def testRunShellCommand_checkReturn_success(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo $ANDROID_DATA; echo %$?'",
'/data\r\n%0\r\n'):
self.device.RunShellCommand('echo $ANDROID_DATA', check_return=True)
cmd = 'echo $ANDROID_DATA'
output = '/data\r\n'
with self.assertShellCall(cmd, output):
self.assertEquals([output.rstrip()],
self.device.RunShellCommand(cmd, check_return=True))
def testRunShellCommand_checkReturn_failure(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo $ANDROID_DATA; echo %$?'",
'\r\n%1\r\n'):
with self.assertRaises(device_errors.CommandFailedError):
self.device.RunShellCommand('echo $ANDROID_DATA', check_return=True)
cmd = 'ls /root'
output = 'opendir failed, Permission denied\r\n'
with self.assertShellCall(cmd, _ShellError(output)):
with self.assertRaises(device_errors.AdbShellCommandFailedError):
self.device.RunShellCommand(cmd, check_return=True)
def testRunShellCommand_checkReturn_disabled(self):
cmd = 'ls /root'
output = 'opendir failed, Permission denied\r\n'
with self.assertShellCall(cmd, _ShellError(output)):
self.assertEquals([output.rstrip()],
self.device.RunShellCommand(cmd, check_return=False))
class DeviceUtilsKillAllTest(DeviceUtilsOldImplTest):
class DeviceUtilsKillAllTest(DeviceUtilsNewImplTest):
def testKillAll_noMatchingProcesses(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'):
output = 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
with self.assertShellCallSequence([('ps', output)]):
with self.assertRaises(device_errors.CommandFailedError):
self.device.KillAll('test_process')
def testKillAll_nonblocking(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
with self.assertShellCallSequence([
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'u0_a1 1234 174 123456 54321 ffffffff 456789ab '
'this.is.a.test.process\r\n'),
("adb -s 0123456789abcdef shell 'kill -9 1234'", '')]):
('kill -9 1234', '')]):
self.assertEquals(1,
self.device.KillAll('this.is.a.test.process', blocking=False))
def testKillAll_blocking(self):
with mock.patch('time.sleep'):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
with self.assertShellCallSequence([
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'u0_a1 1234 174 123456 54321 ffffffff 456789ab '
'this.is.a.test.process\r\n'),
("adb -s 0123456789abcdef shell 'kill -9 1234'", ''),
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
('kill -9 1234', ''),
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'u0_a1 1234 174 123456 54321 ffffffff 456789ab '
'this.is.a.test.process\r\n'),
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n')]):
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n')]):
self.assertEquals(1,
self.device.KillAll('this.is.a.test.process', blocking=True))
def testKillAll_root(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
with self.assertShellCallSequence([
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'u0_a1 1234 174 123456 54321 ffffffff 456789ab '
'this.is.a.test.process\r\n'),
("adb -s 0123456789abcdef shell 'ls /root'", 'Permission denied\r\n'),
("adb -s 0123456789abcdef shell 'su -c kill -9 1234'", '')]):
('ls /root', _ShellError()),
('su -c kill -9 1234', '')]):
self.assertEquals(1,
self.device.KillAll('this.is.a.test.process', as_root=True))
def testKillAll_sigterm(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ps'",
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
with self.assertShellCallSequence([
('ps', 'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'u0_a1 1234 174 123456 54321 ffffffff 456789ab '
'this.is.a.test.process\r\n'),
("adb -s 0123456789abcdef shell 'kill -15 1234'", '')]):
('kill -15 1234', '')]):
self.assertEquals(1,
self.device.KillAll('this.is.a.test.process', signum=signal.SIGTERM))
......@@ -944,10 +1084,11 @@ class DeviceUtilsPushChangedFilesZippedTest(DeviceUtilsHybridImplTest):
self.assertEqual(2, self.device._RunShellCommandImpl.call_count)
self.device._RunShellCommandImpl.assert_any_call(
['unzip', '/test/device/external_dir/tmp.zip'],
as_root=True, check_return=True,
env={'PATH': '$PATH:/data/local/tmp/bin'})
as_root=True,
env={'PATH': '$PATH:/data/local/tmp/bin'},
check_return=True)
self.device._RunShellCommandImpl.assert_any_call(
['rm', '/test/device/external_dir/tmp.zip'])
['rm', '/test/device/external_dir/tmp.zip'], check_return=True)
def testPushChangedFilesZipped_multiple(self):
test_files = [('/test/host/path/file1', '/test/device/path/file1'),
......@@ -971,10 +1112,11 @@ class DeviceUtilsPushChangedFilesZippedTest(DeviceUtilsHybridImplTest):
self.assertEqual(2, self.device._RunShellCommandImpl.call_count)
self.device._RunShellCommandImpl.assert_any_call(
['unzip', '/test/device/external_dir/tmp.zip'],
as_root=True, check_return=True,
env={'PATH': '$PATH:/data/local/tmp/bin'})
as_root=True,
env={'PATH': '$PATH:/data/local/tmp/bin'},
check_return=True)
self.device._RunShellCommandImpl.assert_any_call(
['rm', '/test/device/external_dir/tmp.zip'])
['rm', '/test/device/external_dir/tmp.zip'], check_return=True)
class DeviceUtilsFileExistsTest(DeviceUtilsOldImplTest):
......@@ -1202,44 +1344,22 @@ class DeviceUtilsWriteFileTest(DeviceUtilsOldImplTest):
self.device.WriteFile('/test/file/no.permissions.to.write',
'new test file contents', as_root=True)
class DeviceUtilsWriteTextFileTest(DeviceUtilsOldImplTest):
class DeviceUtilsWriteTextFileTest(DeviceUtilsNewImplTest):
def testWriteTextFileTest_basic(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo some.string"
" > /test/file/to.write; echo %$?'", '%0\r\n'):
with self.assertShellCall('echo some.string > /test/file/to.write'):
self.device.WriteTextFile('/test/file/to.write', 'some.string')
def testWriteTextFileTest_stringWithSpaces(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'echo '\\''some other string'\\''"
" > /test/file/to.write; echo %$?'", '%0\r\n'):
self.device.WriteTextFile('/test/file/to.write', 'some other string')
def testWriteTextFileTest_asRoot_withSu(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ls /root'", 'Permission denied\r\n'),
("adb -s 0123456789abcdef shell 'su -c echo some.string"
" > /test/file/to.write; echo %$?'", '%0\r\n')]):
self.device.WriteTextFile('/test/file/to.write', 'some.string',
as_root=True)
def testWriteTextFileTest_quoted(self):
with self.assertShellCall(
"echo 'some other string' > '/test/file/to write'"):
self.device.WriteTextFile('/test/file/to write', 'some other string')
def testWriteTextFileTest_asRoot_withRoot(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ls /root'", 'hello\r\nworld\r\n'),
("adb -s 0123456789abcdef shell 'echo some.string"
" > /test/file/to.write; echo %$?'", '%0\r\n')]):
self.device.WriteTextFile('/test/file/to.write', 'some.string',
as_root=True)
def testWriteTextFileTest_asRoot_rejected(self):
with self.assertCallsSequence([
("adb -s 0123456789abcdef shell 'ls /root'", 'Permission denied\r\n'),
("adb -s 0123456789abcdef shell 'su -c echo some.string"
" > /test/file/to.write; echo %$?'", '%1\r\n')]):
with self.assertRaises(device_errors.CommandFailedError):
self.device.WriteTextFile('/test/file/to.write', 'some.string',
as_root=True)
def testWriteTextFileTest_asRoot(self):
with self.assertShellCallSequence([
('ls /root', _ShellError()),
('su -c echo string > /test/file', '')]):
self.device.WriteTextFile('/test/file', 'string', as_root=True)
class DeviceUtilsLsTest(DeviceUtilsOldImplTest):
......@@ -1396,26 +1516,26 @@ class DeviceUtilsSetPropTest(DeviceUtilsOldImplTest):
self.device.SetProp('this.is.a.test.property', 'test_property_value')
class DeviceUtilsGetPidsTest(DeviceUtilsOldImplTest):
class DeviceUtilsGetPidsTest(DeviceUtilsNewImplTest):
def testGetPids_noMatches(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'ps'",
with self.assertShellCall(
'ps',
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'user 1000 100 1024 1024 ffffffff 00000000 no.match\r\n'):
self.assertEqual({}, self.device.GetPids('does.not.match'))
def testGetPids_oneMatch(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'ps'",
with self.assertShellCall(
'ps',
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'user 1000 100 1024 1024 ffffffff 00000000 not.a.match\r\n'
'user 1001 100 1024 1024 ffffffff 00000000 one.match\r\n'):
self.assertEqual({'one.match': '1001'}, self.device.GetPids('one.match'))
def testGetPids_mutlipleMatches(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'ps'",
with self.assertShellCall(
'ps',
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'user 1000 100 1024 1024 ffffffff 00000000 not\r\n'
'user 1001 100 1024 1024 ffffffff 00000000 one.match\r\n'
......@@ -1426,8 +1546,8 @@ class DeviceUtilsGetPidsTest(DeviceUtilsOldImplTest):
self.device.GetPids('match'))
def testGetPids_exactMatch(self):
with self.assertCalls(
"adb -s 0123456789abcdef shell 'ps'",
with self.assertShellCall(
'ps',
'USER PID PPID VSIZE RSS WCHAN PC NAME\r\n'
'user 1000 100 1024 1024 ffffffff 00000000 not.exact.match\r\n'
'user 1234 100 1024 1024 ffffffff 00000000 exact.match\r\n'):
......
......@@ -358,8 +358,8 @@ class TestRunner(base_test_runner.BaseTestRunner):
cmd = ['am', 'instrument', '-r']
for k, v in self._GetInstrumentationArgs().iteritems():
cmd.extend(['-e', k, "'%s'" % v])
cmd.extend(['-e', 'class', "'%s'" % test])
cmd.extend(['-e', k, v])
cmd.extend(['-e', 'class', test])
cmd.extend(['-w', instrumentation_path])
return self.device.RunShellCommand(cmd, timeout=timeout, retries=0)
......
......@@ -261,8 +261,8 @@ class InstrumentationTestRunnerTest(unittest.TestCase):
self.instance._RunTest('test.package.TestClass#testMethod', 100)
self.instance.device.RunShellCommand.assert_called_with(
['am', 'instrument', '-r',
'-e', 'test_arg_key', "'test_arg_value'",
'-e', 'class', "'test.package.TestClass#testMethod'",
'-e', 'test_arg_key', 'test_arg_value',
'-e', 'class', 'test.package.TestClass#testMethod',
'-w', 'test.package/MyTestRunner'],
timeout=100, retries=0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment