Commit f415e92a authored by Takuto Ikuta's avatar Takuto Ikuta Committed by Commit Bot

testing: remove legion related scripts

This is to remove reference to python swarming client which is being
deprecated.

testing/legion is not referenced other than example_test_controller.py
https://cs.chromium.org/search/?q=legion+-f:%5Esrc/testing/legion&sq=package:chromium&type=cs

example_test_controller is not referenced other than legion
https://cs.chromium.org/search/?q=example_test_controller&sq=package:chromium&type=cs

Bug: 984869
Change-Id: I9929893154ad35b27c9df1e8266fe1d6f4e22d89
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2053644
Auto-Submit: Takuto Ikuta <tikuta@chromium.org>
Reviewed-by: default avatarYoshisato Yanagisawa <yyanagisawa@google.com>
Reviewed-by: default avatarJoe Downing <joedow@chromium.org>
Reviewed-by: default avatarJohn Budorick <jbudorick@chromium.org>
Commit-Queue: Takuto Ikuta <tikuta@chromium.org>
Commit-Queue: John Budorick <jbudorick@chromium.org>
Cr-Commit-Position: refs/heads/master@{#742853}
parent b8cc0097
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The test controller for the chromoting localhost browser_tests.
This test uses the legion framework to setup this controller which will run
the chromoting_browser_tests on a task machine. This is intended to
be an example Legion-based test for the chromoting team.
The controller will start a task machine to run browser_tests_launcher on. The
output of these tests are streamed back to the test controller to be output
on the controller's stdout and stderr channels. The final test output is then
read and becomes the final output of the controller, mirroring the test's
pass/fail result.
"""
import argparse
import logging
import os
import sys
import time
# Map the legion directory so we can import the host controller.
SRC_DIR = os.path.join('..', '..', '..')
sys.path.append(os.path.join(SRC_DIR, 'testing'))
from legion import test_controller
class ExampleController(test_controller.TestController):
"""The test controller for the Chromoting browser_tests."""
def __init__(self):
super(ExampleController, self).__init__()
self.task = None
self.args = None
def RunTest(self):
"""Main method to run the test code."""
self.ParseArgs()
self.CreateTask()
self.TestIntegrationTests()
def CreateBrowserTestsLauncherCommand(self):
# This command is run on the Swarming bot.
return [
'python',
self.TaskAbsPath('../browser_tests_launcher.py'),
'--commands_file', self.TaskAbsPath(self.args.commands_file),
'--prod_dir', self.TaskAbsPath(self.args.prod_dir),
'--cfg_file', self.TaskAbsPath(self.args.cfg_file),
'--me2me_manifest_file', self.TaskAbsPath(
self.args.me2me_manifest_file),
'--it2me_manifest_file', self.TaskAbsPath(
self.args.it2me_manifest_file),
'--user_profile_dir', self.args.user_profile_dir,
]
def TaskAbsPath(self, path):
"""Returns the absolute path to the resource on the task machine.
Args:
path: The relative path to the resource.
Since the test controller and the task machines run in different tmp dirs
on different machines the absolute path cannot be calculated correctly on
this machine. This function maps the relative path (from this directory)
to an absolute path on the task machine.
"""
return self.task.rpc.AbsPath(path)
def CreateTask(self):
"""Creates a task object and sets the proper values."""
self.task = self.CreateNewTask(
isolated_hash=self.args.task_machine,
dimensions={'os': 'Ubuntu-14.04', 'pool': 'Chromoting'})
self.task.Create()
self.task.WaitForConnection()
def ParseArgs(self):
"""Gets the command line args."""
parser = argparse.ArgumentParser()
parser.add_argument('--task_machine',
help='isolated hash of the task machine.')
# The rest of the args are taken from
# testing/chromoting/browser_tests_launcher.py.
parser.add_argument('-f', '--commands_file',
help='path to file listing commands to be launched.')
parser.add_argument('-p', '--prod_dir',
help='path to folder having product and test binaries.')
parser.add_argument('-c', '--cfg_file',
help='path to test host config file.')
parser.add_argument('--me2me_manifest_file',
help='path to me2me host manifest file.')
parser.add_argument('--it2me_manifest_file',
help='path to it2me host manifest file.')
parser.add_argument(
'-u', '--user_profile_dir',
help='path to user-profile-dir, used by connect-to-host tests.')
self.args, _ = parser.parse_known_args()
def TestIntegrationTests(self):
"""Runs the integration tests via browser_tests_launcher.py."""
# Create a process object, configure it, and start it.
# All interactions with the process are based on this "proc" key.
proc = self.task.rpc.subprocess.Process(
self.CreateBrowserTestsLauncherCommand())
# Set the cwd to browser_tests_launcher relative to this directory.
# This allows browser_test_launcher to use relative paths.
self.task.rpc.subprocess.SetCwd(proc, '../')
# Set the task verbosity to true to allow stdout/stderr to be echo'ed to
# run_task's stdout/stderr on the task machine. This can assist in
# debugging.
self.task.rpc.subprocess.SetVerbose(proc)
# Set the process as detached to create it in a new process group.
self.task.rpc.subprocess.SetDetached(proc)
# Start the actual process on the task machine.
self.task.rpc.subprocess.Start(proc)
# Collect the stdout/stderr and emit it from this controller while the
# process is running.
while self.task.rpc.subprocess.Poll(proc) is None:
# Output the test's stdout and stderr in semi-realtime.
# This is not true realtime due to the RPC calls and the 1s sleep.
stdout, stderr = self.task.rpc.subprocess.ReadOutput(proc)
if stdout:
sys.stdout.write(stdout)
if stderr:
sys.stderr.write(stderr)
time.sleep(1)
# Get the return code, clean up the process object.
returncode = self.task.rpc.subprocess.GetReturncode(proc)
self.task.rpc.subprocess.Delete(proc)
# Pass or fail depending on the return code from the browser_tests_launcher.
if returncode != 0:
raise AssertionError('browser_tests_launcher failed with return code '
'%i' % returncode)
if __name__ == '__main__':
ExampleController().RunController()
mmeade@chromium.org
bgoldman@chromium.org
chaitali@chromium.org
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#!/usr/bin/env python
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Legion-based comm server test."""
import argparse
import logging
import os
import sys
# Map the testing directory so we can import legion.legion_test_case.
TESTING_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'..', '..', '..', '..', 'testing')
sys.path.append(TESTING_DIR)
from legion import legion_test_case
from legion.lib import common_lib
class CommServerTestController(legion_test_case.TestCase):
"""A simple example controller for a test."""
@classmethod
def CreateTestTask(cls):
"""Create a new task."""
parser = argparse.ArgumentParser()
parser.add_argument('--task-hash')
parser.add_argument('--os', default='Ubuntu-14.04')
args, _ = parser.parse_known_args()
task = cls.CreateTask(
isolated_hash=args.task_hash,
dimensions={'os': args.os, 'pool': 'default'},
idle_timeout_secs=90,
connection_timeout_secs=90,
verbosity=logging.DEBUG)
task.Create()
return task
@classmethod
def setUpClass(cls):
"""Creates the task machines and waits until they connect."""
cls.task = cls.CreateTestTask()
cls.task.WaitForConnection()
def testCommServerTest(self):
# This command is run on the Swarming bot.
cmd = [
'python',
'task.py',
'--address', str(common_lib.MY_IP),
'--port', str(self.comm_server.port)
]
process = self.task.Process(cmd)
process.Wait()
retcode = process.GetReturncode()
if retcode != 0:
logging.info('STDOUT:\n%s', process.ReadStdout())
logging.info('STDERR:\n%s', process.ReadStderr())
self.assertEqual(retcode, 0)
# Add a success logging statement to make the logs a little easier to read.
logging.info('Success')
if __name__ == '__main__':
legion_test_case.main()
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Task-based unittest for the Legion event server."""
import argparse
import httplib
import sys
import unittest
class CommServerTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
super(CommServerTest, self).__init__(*args, **kwargs)
parser = argparse.ArgumentParser()
parser.add_argument('--address')
parser.add_argument('--port', type=int)
self.args, _ = parser.parse_known_args()
def Connect(self, verb, path, message=''):
conn = httplib.HTTPConnection(self.args.address, self.args.port)
conn.request(verb, path, body=message)
return conn.getresponse()
def testMessagesUsedAsSignals(self):
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 404)
self.assertEquals(
self.Connect('PUT', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message1').status, 404)
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 404)
def testErrors(self):
for verb in ['GET', 'PUT', 'DELETE']:
self.assertEquals(
self.Connect(verb, '/').status, 403)
self.assertEquals(
self.Connect(verb, '/foobar').status, 403)
self.assertEquals(
self.Connect(verb, '/foobar/').status, 405)
def testMessagePassing(self):
self.assertEquals(
self.Connect('GET', '/messages/message2').status, 404)
self.assertEquals(
self.Connect('PUT', '/messages/message2', 'foo').status, 200)
self.assertEquals(
self.Connect('GET', '/messages/message2').read(), 'foo')
self.assertEquals(
self.Connect('DELETE', '/messages/message2').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message2').status, 404)
self.assertEquals(
self.Connect('GET', '/messages/message2').status, 404)
if __name__ == '__main__':
unittest.main(argv=sys.argv[:1])
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A simple host test module.
This module runs on the host machine and is responsible for creating 2
task machines, waiting for them, and running RPC calls on them.
"""
import argparse
import logging
import os
import sys
import time
# Map the testing directory so we can import legion.legion_test.
TESTING_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'..', '..', '..', '..', 'testing')
sys.path.append(TESTING_DIR)
from legion import legion_test_case
class ExampleTestController(legion_test_case.TestCase):
"""A simple example controller for a test."""
@classmethod
def CreateTestTask(cls):
"""Create a new task."""
parser = argparse.ArgumentParser()
parser.add_argument('--task-hash')
parser.add_argument('--os', default='Ubuntu-14.04')
args, _ = parser.parse_known_args()
task = cls.CreateTask(
isolated_hash=args.task_hash,
dimensions={'os': args.os},
idle_timeout_secs=90,
connection_timeout_secs=90,
verbosity=logging.DEBUG)
task.Create()
return task
@classmethod
def setUpClass(cls):
"""Creates the task machines and waits until they connect."""
cls.task1 = cls.CreateTestTask()
cls.task2 = cls.CreateTestTask()
cls.task1.WaitForConnection()
cls.task2.WaitForConnection()
def testCallEcho(self):
"""Tests rpc.Echo on a task."""
logging.info('Calling Echo on %s', self.task2.name)
self.assertEqual(self.task2.rpc.Echo('foo'), 'echo foo')
def testLaunchTaskBinary(self):
"""Call task_test.py 'name' on the tasks."""
self.VerifyTaskBinaryLaunched(self.task1)
self.VerifyTaskBinaryLaunched(self.task2)
def VerifyTaskBinaryLaunched(self, task):
logging.info(
'Calling Process to run "task_test.py %s"', task.name)
proc = task.Process(['python', 'task_test.py', task.name])
proc.Wait()
self.assertEqual(proc.GetReturncode(), 0)
self.assertIn(task.name, proc.ReadStdout())
self.assertEquals(proc.ReadStderr(), '')
proc.Delete()
if __name__ == '__main__':
legion_test_case.main()
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A simple client test module.
This module is invoked by the host by calling the client controller's
Subprocess RPC method. The name is passed in as a required argument on the
command line.
"""
import argparse
import os
import sys
def main():
parser = argparse.ArgumentParser()
parser.add_argument('name')
args = parser.parse_args()
print 'Hello world from', args.name
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import httplib
import sys
def GetArgs():
"""Returns the specified command line args."""
parser = argparse.ArgumentParser()
parser.add_argument('--server', required=True)
parser.add_argument('--port', required=True, type=int)
return parser.parse_args()
def main():
"""Get the webpage and assert the text == 'SUCCESS!'."""
args = GetArgs()
conn = httplib.HTTPConnection(args.server, args.port)
conn.request('GET', '/')
response = conn.getresponse().read()
assert response == 'SUCCESS!', '%s != SUCCESS!' % response
if __name__ == '__main__':
sys.exit(main())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import SimpleHTTPServer
import SocketServer
import sys
import threading
import time
class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def do_GET(self):
self.wfile.write('SUCCESS!')
def GetArgs():
"""Returns the specified command line args."""
parser = argparse.ArgumentParser()
parser.add_argument('--port', required=True, type=int)
parser.add_argument('--timeout', type=int, default=60)
return parser.parse_args()
def main():
"""Run a webserver until the process is killed."""
server = None
args = GetArgs()
try:
server = SocketServer.TCPServer(('', args.port), Handler)
thread = threading.Thread(target=server.serve_forever)
thread.start()
start = time.time()
while time.time() < start + args.timeout:
time.sleep(1)
finally:
if server:
server.shutdown()
if __name__ == '__main__':
sys.exit(main())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import logging
import os
import socket
import sys
import time
TESTING_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..', '..', '..')
sys.path.append(TESTING_DIR)
from legion import legion_test_case
class HttpTest(legion_test_case.TestCase):
"""Example HTTP test case."""
@classmethod
def GetArgs(cls):
"""Get command line args."""
parser = argparse.ArgumentParser()
parser.add_argument('--http-server')
parser.add_argument('--http-client')
parser.add_argument('--os', default='Ubuntu-14.04')
args, _ = parser.parse_known_args()
return args
@classmethod
def CreateTask(cls, name, task_hash, os_type):
"""Create a new task."""
#pylint: disable=unexpected-keyword-arg,no-value-for-parameter
#pylint: disable=arguments-differ
task = super(HttpTest, cls).CreateTask(
name=name,
isolated_hash=task_hash,
dimensions={'os': os_type})
task.Create()
return task
@classmethod
def setUpClass(cls):
"""Creates the task machines and waits until they connect."""
args = cls.GetArgs()
cls.http_server = cls.CreateTask(
'http_server', args.http_server, args.os)
cls.http_client = cls.CreateTask(
'http_client', args.http_client, args.os)
cls.http_server.WaitForConnection()
cls.http_client.WaitForConnection()
def CanConnectToServerPort(self, server_port):
"""Connect to a port on the http_server.
Returns:
True if the connection succeeded, False otherwise.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.http_server.ip_address, server_port))
return True
except socket.error:
return False
def FindOpenPortOnServer(self):
"""Find an open port on the server and return it.
Returns:
The value of an open port.
"""
for server_port in xrange(2000, 20000):
if not self.CanConnectToServerPort(server_port):
return server_port
self.fail('Unable to find an open port on the server.')
def StartServer(self, server_port):
"""Starts the http_server process.
Returns:
The server process.
"""
def WaitForServer():
timeout = time.time() + 5
while timeout > time.time():
if self.CanConnectToServerPort(server_port):
return
self.fail('Server process failed to start')
cmd = [
self.http_server.executable,
'http_server.py',
'--port', str(server_port)
]
proc = self.http_server.Process(cmd)
WaitForServer()
return proc
def StartClient(self, server_port):
"""Starts the http_client process.
Returns:
The client process.
"""
cmd = [
self.http_client.executable,
'http_client.py',
'--server', self.http_server.ip_address,
'--port', str(server_port)
]
return self.http_client.Process(cmd)
def testHttpWorks(self):
"""Tests that the client process can talk to the server process."""
server_proc = None
client_proc = None
try:
server_port = self.FindOpenPortOnServer()
logging.info('Starting server at %s:%s', self.http_server.ip_address,
server_port)
server_proc = self.StartServer(server_port)
logging.info('Connecting to server at %s:%s', self.http_server.ip_address,
server_port)
client_proc = self.StartClient(server_port)
client_proc.Wait()
logging.info('client_proc.stdout: %s', client_proc.ReadStdout())
logging.info('client_proc.stderr: %s', client_proc.ReadStderr())
self.assertEqual(client_proc.GetReturncode(), 0)
finally:
if server_proc:
server_proc.Kill()
server_proc.Delete()
if client_proc:
client_proc.Kill()
client_proc.Delete()
if __name__ == '__main__':
legion_test_case.main()
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A host test module demonstrating interacting with remote subprocesses."""
import argparse
import logging
import os
import sys
import time
import xmlrpclib
# Map the testing directory so we can import legion.legion_test.
TESTING_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'..', '..', '..', '..', 'testing')
sys.path.append(TESTING_DIR)
from legion import legion_test_case
from legion.lib.rpc import jsonrpclib
class ExampleTestController(legion_test_case.TestCase):
"""An example controller using the remote subprocess functions."""
# We use python from the command line to mimic ls and sleep due to
# compatibility issues across OSes.
LS = ['python', '-c', 'import os; print os.listdir(".")']
SLEEP10 = ['python', '-c', 'import time; time.sleep(10)']
SLEEP20 = ['python', '-c', 'import time; time.sleep(20)']
@classmethod
def setUpClass(cls):
"""Creates the task machine and waits until it connects."""
parser = argparse.ArgumentParser()
parser.add_argument('--task-hash')
parser.add_argument('--os', default='Ubuntu-14.04')
args, _ = parser.parse_known_args()
cls.task = cls.CreateTask(
isolated_hash=args.task_hash,
dimensions={'os': args.os},
idle_timeout_secs=90,
connection_timeout_secs=90,
verbosity=logging.DEBUG)
cls.task.Create()
cls.task.WaitForConnection()
def testMultipleProcesses(self):
"""Tests that processes can be run and controlled simultaneously."""
start = time.time()
logging.info('Starting "sleep 10" and "sleep 20"')
sleep10 = self.task.Process(self.SLEEP10)
sleep20 = self.task.Process(self.SLEEP20)
logging.info('Waiting for "sleep 10" to finish and verifying timing')
sleep10.Wait()
elapsed = time.time() - start
self.assertGreaterEqual(elapsed, 10)
self.assertLess(elapsed, 11)
logging.info('Waiting for "sleep 20" to finish and verifying timing')
sleep20.Wait()
elapsed = time.time() - start
self.assertGreaterEqual(elapsed, 20)
sleep10.Delete()
sleep20.Delete()
def testTerminate(self):
"""Tests that a process can be correctly terminated."""
start = time.time()
logging.info('Starting "sleep 20"')
sleep20 = self.task.Process(self.SLEEP20)
logging.info('Calling Terminate()')
sleep20.Terminate()
try:
logging.info('Trying to wait for "sleep 20" to complete')
sleep20.Wait()
except xmlrpclib.Fault:
pass
finally:
sleep20.Delete()
logging.info('Checking to make sure "sleep 20" was actually terminated')
self.assertLess(time.time() - start, 20)
def testLs(self):
"""Tests that the returned results from a process are correct."""
ls = self.task.Process(self.LS)
logging.info('Trying to wait for "ls" to complete')
ls.Wait()
self.assertEqual(ls.GetReturncode(), 0)
self.assertIn('task.isolate', ls.ReadStdout())
def testProcessOutput(self):
"""Tests that a process's output gets logged to a file in the output-dir."""
code = ('import sys\n'
'sys.stdout.write("Hello stdout")\n'
'sys.stderr.write("Hello stderr")')
self.task.rpc.WriteFile('test.py', code)
proc = self.task.Process(['python', 'test.py'])
proc.Wait()
self.CheckProcessOutput('stdout', proc.key, 'Hello stdout')
self.CheckProcessOutput('stderr', proc.key, 'Hello stderr')
def testCustomKey(self):
"""Tests that a custom key passed to a process works correctly."""
code = ('import sys\n'
'sys.stdout.write("Hello CustomKey stdout")\n'
'sys.stderr.write("Hello CustomKey stderr")')
self.task.rpc.WriteFile('test.py', code)
proc = self.task.Process(['python', 'test.py'], key='CustomKey')
proc.Wait()
self.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout')
self.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr')
def testKeyReuse(self):
"""Tests that a key cannot be reused."""
self.task.Process(self.LS, key='KeyReuse')
self.assertRaises(jsonrpclib.Fault, self.task.Process, self.LS,
key='KeyReuse')
def CheckProcessOutput(self, pipe, key, expected):
"""Checks that a process' output files are correct."""
logging.info('Reading output file')
output_dir = self.task.rpc.GetOutputDir()
path = self.task.rpc.PathJoin(output_dir, '%s.%s' % (key, pipe))
actual = self.task.rpc.ReadFile(path)
self.assertEqual(expected, actual)
if __name__ == '__main__':
legion_test_case.main()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Adds unittest-esque functionality to Legion."""
import argparse
import logging
import sys
import unittest
# pylint: disable=relative-import
# Import common_lib first so we can setup the environment
from lib import common_lib
common_lib.SetupEnvironment()
from legion.lib import task_controller
from legion.lib import task_registration_server
from legion.lib.comm_server import comm_server
BANNER_WIDTH = 80
class TestCase(unittest.TestCase):
"""Test case class with added Legion support."""
_registration_server = None
_initialized = False
@classmethod
def __new__(cls, *args, **kwargs):
"""Initialize the class and return a new instance."""
cls._InitializeClass()
return super(TestCase, cls).__new__(*args, **kwargs)
def __init__(self, test_name='runTest'):
super(TestCase, self).__init__(test_name)
method = getattr(self, test_name, None)
if method:
# Install the _RunTest method
self._TestMethod = method
setattr(self, test_name, self._RunTest)
self._output_dir = None
@property
def output_dir(self):
if not self._output_dir:
self._output_dir = self.rpc.GetOutputDir()
return self._output_dir
def _RunTest(self):
"""Runs the test method and provides banner info and error reporting."""
self._LogInfoBanner(self._testMethodName, self.shortDescription())
try:
return self._TestMethod()
except:
exc_info = sys.exc_info()
logging.error('', exc_info=exc_info)
raise exc_info[0], exc_info[1], exc_info[2]
@classmethod
def _InitializeClass(cls):
"""Handles class level initialization.
There are 2 types of setup/teardown methods that always need to be run:
1) Framework level setup/teardown
2) Test case level setup/teardown
This method installs handlers in place of setUpClass and tearDownClass that
will ensure both types of setup/teardown methods are called correctly.
"""
if cls._initialized:
return
cls._OriginalSetUpClassMethod = cls.setUpClass
cls.setUpClass = cls._HandleSetUpClass
cls._OriginalTearDownClassMethod = cls.tearDownClass
cls.tearDownClass = cls._HandleTearDownClass
cls._initialized = True
@classmethod
def _LogInfoBanner(cls, method_name, method_doc=None):
"""Formats and logs test case information."""
logging.info('*' * BANNER_WIDTH)
logging.info(method_name.center(BANNER_WIDTH))
if method_doc:
for line in method_doc.split('\n'):
logging.info(line.center(BANNER_WIDTH))
logging.info('*' * BANNER_WIDTH)
@classmethod
def CreateTask(cls, *args, **kwargs):
"""Convenience method to create a new task."""
task = task_controller.TaskController(
reg_server_port=cls._registration_server.port, *args, **kwargs)
cls._registration_server.RegisterTaskCallback(
task.otp, task.OnConnect)
return task
@classmethod
def _SetUpFramework(cls):
"""Perform the framework-specific setup operations."""
# Setup the registration server
cls._registration_server = (
task_registration_server.TaskRegistrationServer())
common_lib.OnShutdown += cls._registration_server.Shutdown
cls._registration_server.Start()
# Setup the event server
cls.comm_server = comm_server.CommServer()
common_lib.OnShutdown += cls.comm_server.shutdown
cls.comm_server.start()
@classmethod
def _TearDownFramework(cls):
"""Perform the framework-specific teardown operations."""
common_lib.Shutdown()
@classmethod
def _HandleSetUpClass(cls):
"""Performs common class-level setup operations.
This method performs test-wide setup such as starting the registration
server and then calls the original setUpClass method."""
try:
cls._LogInfoBanner('setUpClass', 'Performs class level setup.')
cls._SetUpFramework()
cls._OriginalSetUpClassMethod()
except:
# Make sure we tear down in case of any exceptions
cls._HandleTearDownClass(setup_failed=True)
exc_info = sys.exc_info()
logging.error('', exc_info=exc_info)
raise exc_info[0], exc_info[1], exc_info[2]
@classmethod
def _HandleTearDownClass(cls, setup_failed=False):
"""Performs common class-level tear down operations.
This method calls the original tearDownClass then performs test-wide
tear down such as stopping the registration server.
"""
cls._LogInfoBanner('tearDownClass', 'Performs class level tear down.')
try:
if not setup_failed:
cls._OriginalTearDownClassMethod()
finally:
cls._TearDownFramework()
def main():
unittest.main(argv=sys.argv[:1])
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Base handler class for all category handlers."""
class BaseHandler(object):
"""Sets up default verb handlers for the child class."""
def do_PUT(self, request):
request.send_response(501)
def do_POST(self, request):
request.send_response(501)
def do_GET(self, request):
request.send_response(501)
def do_DELETE(self, request):
request.send_response(501)
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Communications server.
This is the HTTP server class. The server is run in a separate thread allowing
the calling code to proceed normally after calling start(). shutdown() must
be called to tear the server down. Failure to do this will most likely end up
hanging the program.
"""
import BaseHTTPServer
import SocketServer
import threading
from legion.lib import common_lib
from legion.lib.comm_server import server_handler
class CommServer(SocketServer.ThreadingMixIn,
BaseHTTPServer.HTTPServer):
"""An extension of the HTTPServer class which handles requests in threads."""
def __init__(self, address='', port=None):
self._port = port or common_lib.GetUnusedPort()
self._address = address
BaseHTTPServer.HTTPServer.__init__(self,
(self._address, self._port),
server_handler.ServerHandler)
@property
def port(self):
return self._port
@property
def address(self):
return self._address
def start(self):
"""Starts the server in another thread.
The thread will stay active until shutdown() is called. There is no reason
to hold a reference to the thread object.
The naming convention used here (lowercase) is used to match the base
server's naming convention.
"""
threading.Thread(target=self.serve_forever).start()
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the handler for /messages/<NAME> paths.
The name of the message is expected to follow the /messages/ portion of the
path. The body of the request will contain the message, both when uploading
the message as well as retrieving it. The message will remain on the server
until the calling code does an explicit DELETE call to remove it.
The message is optional. This allows the caller to use this as a simple signal
server. The return code can always be used to tell if a message with that name
exists on the server (200 exists, 404 doesn't exist).
When uploading a body ensure the content-length header is passed correctly.
If the content-length isn't passed no data is read from the body. If its set
too low only part of the message will be read. If its set too high the server
will block waiting for more data to be uploaded.
"""
import re
import threading
from legion.lib.comm_server import base_handler
class MessageHandler(base_handler.BaseHandler):
"""Handles /messages/<NAME> requests."""
_REGEX = '/messages/(?P<name>[a-zA-Z0-9_.-~]+)'
_messages = {}
_message_lock = threading.Lock()
def _GetName(self, request):
"""Gets the message name from the URL."""
match = re.match(self._REGEX, request.path)
if not match:
return None
return match.group('name')
def do_PUT(self, request):
"""Handles PUT requests."""
name = self._GetName(request)
if not name:
return request.send_error(405, 'Key name required')
with self._message_lock:
self._messages[name] = request.rfile.read(
int(request.headers.getheader('content-length', 0)))
return request.send_response(200)
def do_GET(self, request):
"""Handles GET requests."""
name = self._GetName(request)
if not name:
return request.send_error(405, 'Key name required')
elif name not in self._messages:
return request.send_error(404, 'Key not found')
with self._message_lock:
request.send_response(200)
request.send_header('Content-type', 'text/plain')
request.send_header('Content-Length', str(len(self._messages[name])))
request.end_headers()
request.wfile.write(self._messages[name])
def do_DELETE(self, request):
"""Handles DELETE requests."""
name = self._GetName(request)
if not name:
return request.send_error(405, 'Key name required')
with self._message_lock:
if name in self._messages:
del self._messages[name]
return request.send_response(200)
else:
return request.send_error(404, 'Key not found')
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Handler used directly by the server.
This handler routes the request to the correct subhandler based on the first
value in the URL path. For example, the MessageHandler has been added to the
class's _HANDLERS object and handles all requests destined for URL/messages/...
To extend this functionality implement a handler and add it to the _HANDLERS
object with the correct category. The category is defined as the first part of
the URL path (i.e. URL/<CATEGORY>). The handler will then be called any time a
request comes in with that category.
"""
import re
import SimpleHTTPServer
# Import all communications handlers
from legion.lib.comm_server import message_handler
class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""Server handler class."""
_HANDLERS = {
'messages': message_handler.MessageHandler,
}
_REGEX = '/(?P<category>[a-zA-Z0-9_.-~]+)/'
def log_message(self, *args, **kwargs):
"""Silence those pesky server-side print statements."""
pass
def _GetCategoryName(self):
"""Extracts and returns the category name."""
match = re.match(self._REGEX, self.path)
if not match:
return
return match.group('category')
def _GetHandler(self):
"""Returns the category handler object if it exists."""
category = self._GetCategoryName()
if not category:
return self.send_error(403, 'Category must be supplied in the form of '
'/category_name/...')
handler = self._HANDLERS.get(category)
if not handler:
return self.send_error(405, 'No handler found for /%s/' % category)
return handler()
def do_GET(self):
"""Dispatches GET requests."""
handler = self._GetHandler()
if handler:
handler.do_GET(self)
def do_POST(self):
"""Dispatches POST requests."""
handler = self._GetHandler()
if handler:
handler.do_POST(self)
def do_PUT(self):
"""Dispatches PUT requests."""
handler = self._GetHandler()
if handler:
handler.do_PUT(self)
def do_DELETE(self):
"""Dispatches DELETE requests."""
handler = self._GetHandler()
if handler:
handler.do_DELETE(self)
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common library methods used by both coordinator and task machines."""
import argparse
import logging
import os
import socket
import sys
# pylint: disable=relative-import
# Import event directly here since it is used to decorate a module-level method.
import event
LOGGING_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'WARN', 'ERROR']
MY_IP = socket.gethostbyname(socket.gethostname())
DEFAULT_TIMEOUT_SECS = 30 * 60 # 30 minutes
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
LEGION_IMPORT_FIX = os.path.join(THIS_DIR, '..', '..')
SWARMING_DIR = os.path.join(THIS_DIR, '..', '..', '..', 'tools',
'swarming_client')
def InitLogging():
"""Initialize the logging module.
Raises:
argparse.ArgumentError if the --verbosity arg is incorrect.
"""
parser = argparse.ArgumentParser()
logging_action = parser.add_argument('--verbosity', default='INFO')
args, _ = parser.parse_known_args()
if args.verbosity not in LOGGING_LEVELS:
raise argparse.ArgumentError(
logging_action, 'Only levels %s supported' % str(LOGGING_LEVELS))
logging.basicConfig(
format='%(asctime)s %(filename)s:%(lineno)s %(levelname)s] %(message)s',
datefmt='%H:%M:%S', level=args.verbosity)
def GetOutputDir():
"""Get the isolated output directory specified on the command line."""
parser = argparse.ArgumentParser()
parser.add_argument('--output-dir')
args, _ = parser.parse_known_args()
return args.output_dir
def GetUnusedPort():
"""Finds and returns an unused port."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0))
_, port = s.getsockname()
s.close()
return port
def SetupEnvironment():
"""Perform all environmental setup steps needed."""
InitLogging()
sys.path.append(LEGION_IMPORT_FIX)
sys.path.append(SWARMING_DIR)
def Shutdown():
"""Raises the on_shutdown event."""
OnShutdown()
@event.Event
def OnShutdown():
"""Shutdown event dispatcher.
To use this simply use the following code example:
common_lib.OnShutdown += my_handler_method
my_handler_method will be called when OnShutdown is called (this is done via
the Shutdown method above, but can be called directly as well.
"""
pass
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Java-like event module.
To use this simply decorate the callback method with @event.Event, subscribe
to the callbacks using method += callback, and call the event method to
send the events.
class Foo(object):
@event.Event
def on_event_raised_1(self):
# This is defined as a pass since this won't directly operate on the call.
# This event takes no args when called.
pass
@event.Event
def on_event_raised_2(self, arg):
# This event takes 1 arg
pass
def do_something(self):
# This method will raise an event on each handler
self.on_event_raised_1()
self.on_event_raised_2('foo')
To subscribe to events use the following code
def callback_1():
print 'In callback 1'
def callback_2(arg):
print 'In callback 2', arg
foo = Foo()
foo.on_event_raised_1 += callback_1
foo.on_event_raised_2 += callback_2
foo.do_something()
Each event can be associated with zero or more callback handlers, and each
callback handler can be associated with one or more events.
"""
import logging
import traceback
class Event(object):
""""A Java-like event class."""
def __init__(self, method):
self._method = method
self._callbacks = []
def __iadd__(self, callback):
"""Allow method += callback syntax."""
assert callback not in self._callbacks
self._callbacks.append(callback)
return self
def __isub__(self, callback):
"""Allow method -= callback syntax."""
self._callbacks.remove(callback)
return self
def __call__(self, *args, **kwargs):
"""Dispatches a method call to the appropriate callback handlers."""
for callback in self._callbacks:
try:
callback(*args, **kwargs)
except: # pylint: disable=bare-except
# Catch all exceptions here and log them. This way one exception won't
# stop the remaining callbacks from being executed.
logging.error(traceback.format_exc())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""RPC compatible subprocess-type module.
This module defined both a task-side process class as well as a controller-side
process wrapper for easier access and usage of the task-side process.
"""
import logging
import os
import subprocess
import sys
import threading
import time
from legion.lib import common_lib
from utils import subprocess42
class TimeoutError(Exception):
pass
class ControllerProcessWrapper(object):
"""Controller-side process wrapper class.
This class provides a more intuitive interface to task-side processes
than calling the methods directly using the RPC object.
"""
def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None,
key=None, shell=None):
logging.debug('Creating a process with cmd=%s', cmd)
self._rpc = rpc
self._key = rpc.subprocess.Process(cmd, key)
logging.debug('Process created with key=%s', self._key)
if verbose:
self._rpc.subprocess.SetVerbose(self._key)
if detached:
self._rpc.subprocess.SetDetached(self._key)
if cwd:
self._rpc.subprocess.SetCwd(self._key, cwd)
if shell:
self._rpc.subprocess.SetShell(self._key)
self._rpc.subprocess.Start(self._key)
@property
def key(self):
return self._key
def Terminate(self):
logging.debug('Terminating process %s', self._key)
return self._rpc.subprocess.Terminate(self._key)
def Kill(self):
logging.debug('Killing process %s', self._key)
self._rpc.subprocess.Kill(self._key)
def Delete(self):
return self._rpc.subprocess.Delete(self._key)
def GetReturncode(self):
return self._rpc.subprocess.GetReturncode(self._key)
def ReadStdout(self):
"""Returns all stdout since the last call to ReadStdout.
This call allows the user to read stdout while the process is running.
However each call will flush the local stdout buffer. In order to make
multiple calls to ReadStdout and to retain the entire output the results
of this call will need to be buffered in the calling code.
"""
return self._rpc.subprocess.ReadStdout(self._key)
def ReadStderr(self):
"""Returns all stderr read since the last call to ReadStderr.
See ReadStdout for additional details.
"""
return self._rpc.subprocess.ReadStderr(self._key)
def ReadOutput(self):
"""Returns the (stdout, stderr) since the last Read* call.
See ReadStdout for additional details.
"""
return self._rpc.subprocess.ReadOutput(self._key)
def Wait(self, timeout=None):
return self._rpc.subprocess.Wait(self._key, timeout)
def Poll(self):
return self._rpc.subprocess.Poll(self._key)
def GetPid(self):
return self._rpc.subprocess.GetPid(self._key)
class Process(object):
"""Implements a task-side non-blocking subprocess.
This non-blocking subprocess allows the caller to continue operating while
also able to interact with this subprocess based on a key returned to
the caller at the time of creation.
Creation args are set via Set* methods called after calling Process but
before calling Start. This is due to a limitation of the XML-RPC
implementation not supporting keyword arguments.
"""
_processes = {}
_process_next_id = 0
_creation_lock = threading.Lock()
def __init__(self, cmd, key):
self.stdout = ''
self.stderr = ''
self.key = key
self.cmd = cmd
self.proc = None
self.cwd = None
self.shell = False
self.verbose = False
self.detached = False
self.complete = False
self.data_lock = threading.Lock()
self.stdout_file = open(self._CreateOutputFilename('stdout'), 'wb+')
self.stderr_file = open(self._CreateOutputFilename('stderr'), 'wb+')
def _CreateOutputFilename(self, fname):
return os.path.join(common_lib.GetOutputDir(), '%s.%s' % (self.key, fname))
def __str__(self):
return '%r, cwd=%r, verbose=%r, detached=%r' % (
self.cmd, self.cwd, self.verbose, self.detached)
def _reader(self):
for pipe, data in self.proc.yield_any():
with self.data_lock:
if pipe == 'stdout':
self.stdout += data
self.stdout_file.write(data)
self.stdout_file.flush()
if self.verbose:
sys.stdout.write(data)
else:
self.stderr += data
self.stderr_file.write(data)
self.stderr_file.flush()
if self.verbose:
sys.stderr.write(data)
self.complete = True
@classmethod
def KillAll(cls):
for key in cls._processes:
cls.Kill(key)
@classmethod
def Process(cls, cmd, key=None):
with cls._creation_lock:
if not key:
key = 'Process%d' % cls._process_next_id
cls._process_next_id += 1
if key in cls._processes:
raise KeyError('Key %s already in use' % key)
logging.debug('Creating process %s with cmd %r', key, cmd)
cls._processes[key] = cls(cmd, key)
return key
def _Start(self):
logging.info('Starting process %s', self)
self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
stderr=subprocess42.PIPE,
detached=self.detached, cwd=self.cwd,
shell=self.shell)
threading.Thread(target=self._reader).start()
@classmethod
def Start(cls, key):
cls._processes[key]._Start()
@classmethod
def SetCwd(cls, key, cwd):
"""Sets the process's cwd."""
logging.debug('Setting %s cwd to %s', key, cwd)
cls._processes[key].cwd = cwd
@classmethod
def SetShell(cls, key):
"""Sets the process's shell arg to True."""
logging.debug('Setting %s.shell = True', key)
cls._processes[key].shell = True
@classmethod
def SetDetached(cls, key):
"""Creates a detached process."""
logging.debug('Setting %s.detached = True', key)
cls._processes[key].detached = True
@classmethod
def SetVerbose(cls, key):
"""Sets the stdout and stderr to be emitted locally."""
logging.debug('Setting %s.verbose = True', key)
cls._processes[key].verbose = True
@classmethod
def Terminate(cls, key):
logging.debug('Terminating process %s', key)
cls._processes[key].proc.terminate()
@classmethod
def Kill(cls, key):
logging.debug('Killing process %s', key)
cls._processes[key].proc.kill()
@classmethod
def Delete(cls, key):
if cls.GetReturncode(key) is None:
logging.warning('Killing %s before deleting it', key)
cls.Kill(key)
logging.debug('Deleting process %s', key)
cls._processes.pop(key)
@classmethod
def GetReturncode(cls, key):
return cls._processes[key].proc.returncode
@classmethod
def ReadStdout(cls, key):
"""Returns all stdout since the last call to ReadStdout.
This call allows the user to read stdout while the process is running.
However each call will flush the local stdout buffer. In order to make
multiple calls to ReadStdout and to retain the entire output the results
of this call will need to be buffered in the calling code.
"""
proc = cls._processes[key]
with proc.data_lock:
# Perform a "read" on the stdout data
stdout = proc.stdout
proc.stdout = ''
return stdout
@classmethod
def ReadStderr(cls, key):
"""Returns all stderr read since the last call to ReadStderr.
See ReadStdout for additional details.
"""
proc = cls._processes[key]
with proc.data_lock:
# Perform a "read" on the stderr data
stderr = proc.stderr
proc.stderr = ''
return stderr
@classmethod
def ReadOutput(cls, key):
"""Returns the (stdout, stderr) since the last Read* call.
See ReadStdout for additional details.
"""
return cls.ReadStdout(key), cls.ReadStderr(key)
@classmethod
def Wait(cls, key, timeout=None):
"""Wait for the process to complete.
We wait for all of the output to be written before returning. This solves
a race condition found on Windows where the output can lag behind the
wait call.
Raises:
TimeoutError if the process doesn't finish in the specified timeout.
"""
end = None if timeout is None else timeout + time.time()
while end is None or end > time.time():
if cls._processes[key].complete:
return
time.sleep(0.05)
raise TimeoutError()
@classmethod
def Poll(cls, key):
return cls._processes[key].proc.poll()
@classmethod
def GetPid(cls, key):
return cls._processes[key].proc.pid
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module to implement the SimpleXMLRPCServer module using JSON-RPC.
This module uses SimpleXMLRPCServer as the base and only overrides those
portions that implement the XML-RPC protocol. These portions are rewritten
to use the JSON-RPC protocol instead.
When large portions of code need to be rewritten the original code and
comments are preserved. The intention here is to keep the amount of code
change to a minimum.
This module only depends on default Python modules, as well as jsonrpclib
which also uses only default modules. No third party code is required to
use this module.
"""
import json
import SimpleXMLRPCServer as _base
import SocketServer
import sys
import traceback
try:
import fcntl
except ImportError:
fcntl = None
try:
import gzip
except ImportError:
gzip = None #python can be built without zlib/gzip support
#pylint: disable=relative-import
import jsonrpclib
class SimpleJSONRPCRequestHandler(_base.SimpleXMLRPCRequestHandler):
"""Request handler class for received requests.
This class extends the functionality of SimpleXMLRPCRequestHandler and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
def do_POST(self):
"""Handles the HTTP POST request.
Attempts to interpret all HTTP POST requests as JSON-RPC calls,
which are forwarded to the server's _dispatch method for handling.
"""
# Check that the path is legal
if not self.is_rpc_path_valid():
self.report_404()
return
try:
# Get arguments by reading body of request.
# We read this in chunks to avoid straining
# socket.read(); around the 10 or 15Mb mark, some platforms
# begin to have problems (bug #792570).
max_chunk_size = 10*1024*1024
size_remaining = int(self.headers['content-length'])
data = []
while size_remaining:
chunk_size = min(size_remaining, max_chunk_size)
chunk = self.rfile.read(chunk_size)
if not chunk:
break
data.append(chunk)
size_remaining -= len(data[-1])
data = ''.join(data)
data = self.decode_request_content(data)
if data is None:
return # response has been sent
# In previous versions of SimpleXMLRPCServer, _dispatch
# could be overridden in this class, instead of in
# SimpleXMLRPCDispatcher. To maintain backwards compatibility,
# check to see if a subclass implements _dispatch and dispatch
# using that method if present.
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None), self.path)
except Exception, e: # This should only happen if the module is buggy
# internal error, report as HTTP server error
self.send_response(500)
# Send information about the exception if requested
if (hasattr(self.server, '_send_traceback_header') and
self.server._send_traceback_header):
self.send_header('X-exception', str(e))
self.send_header('X-traceback', traceback.format_exc())
self.send_header('Content-length', '0')
self.end_headers()
else:
# got a valid JSON RPC response
self.send_response(200)
self.send_header('Content-type', 'application/json')
if self.encode_threshold is not None:
if len(response) > self.encode_threshold:
q = self.accept_encodings().get('gzip', 0)
if q:
try:
response = jsonrpclib.gzip_encode(response)
self.send_header('Content-Encoding', 'gzip')
except NotImplementedError:
pass
self.send_header('Content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
class SimpleJSONRPCDispatcher(_base.SimpleXMLRPCDispatcher):
"""Dispatcher for received JSON-RPC requests.
This class extends the functionality of SimpleXMLRPCDispatcher and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
"""Dispatches an JSON-RPC method from marshalled (JSON) data.
JSON-RPC methods are dispatched from the marshalled (JSON) data
using the _dispatch method and the result is returned as
marshalled data. For backwards compatibility, a dispatch
function can be provided as an argument (see comment in
SimpleJSONRPCRequestHandler.do_POST) but overriding the
existing method through subclassing is the preferred means
of changing method dispatch behavior.
Returns:
The JSON-RPC string to return.
"""
method = ''
params = []
ident = ''
try:
request = json.loads(data)
jsonrpclib.ValidateRequest(request)
method = request['method']
params = request['params']
ident = request['id']
# generate response
if dispatch_method is not None:
response = dispatch_method(method, params)
else:
response = self._dispatch(method, params)
response = jsonrpclib.CreateResponseString(response, ident)
except jsonrpclib.Fault as fault:
response = jsonrpclib.CreateResponseString(fault, ident)
# Catch all exceptions here as they should be raised on the caller side.
except: #pylint: disable=bare-except
# report exception back to server
exc_type, exc_value, _ = sys.exc_info()
response = jsonrpclib.CreateResponseString(
jsonrpclib.Fault(1, '%s:%s' % (exc_type, exc_value)), ident)
return response
class SimpleJSONRPCServer(SocketServer.TCPServer,
SimpleJSONRPCDispatcher):
"""Simple JSON-RPC server.
This class mimics the functionality of SimpleXMLRPCServer and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
allow_reuse_address = True
# Warning: this is for debugging purposes only! Never set this to True in
# production code, as will be sending out sensitive information (exception
# and stack trace details) when exceptions are raised inside
# SimpleJSONRPCRequestHandler.do_POST
_send_traceback_header = False
def __init__(self, addr, requestHandler=SimpleJSONRPCRequestHandler,
logRequests=True, allow_none=False, encoding=None,
bind_and_activate=True):
self.logRequests = logRequests
SimpleJSONRPCDispatcher.__init__(self, allow_none, encoding)
SocketServer.TCPServer.__init__(self, addr, requestHandler,
bind_and_activate)
# [Bug #1222790] If possible, set close-on-exec flag; if a
# method spawns a subprocess, the subprocess shouldn't have
# the listening socket open.
if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module to implement the JSON-RPC protocol.
This module uses xmlrpclib as the base and only overrides those
portions that implement the XML-RPC protocol. These portions are rewritten
to use the JSON-RPC protocol instead.
When large portions of code need to be rewritten the original code and
comments are preserved. The intention here is to keep the amount of code
change to a minimum.
This module only depends on default Python modules. No third party code is
required to use this module.
"""
# pylint: disable=no-value-for-parameter
import json
import urllib
import xmlrpclib as _base
__version__ = '1.0.0'
gzip_encode = _base.gzip_encode
gzip = _base.gzip
class Error(Exception):
def __str__(self):
return repr(self)
class ProtocolError(Error):
"""Indicates a JSON protocol error."""
def __init__(self, url, errcode, errmsg, headers):
Error.__init__(self)
self.url = url
self.errcode = errcode
self.errmsg = errmsg
self.headers = headers
def __repr__(self):
return (
'<ProtocolError for %s: %s %s>' %
(self.url, self.errcode, self.errmsg))
class ResponseError(Error):
"""Indicates a broken response package."""
pass
class Fault(Error):
"""Indicates a JSON-RPC fault package."""
def __init__(self, code, message):
Error.__init__(self)
if not isinstance(code, int):
raise ProtocolError('Fault code must be an integer.')
self.code = code
self.message = message
def __repr__(self):
return (
'<Fault %s: %s>' %
(self.code, repr(self.message))
)
def CreateRequest(methodname, params, ident=''):
"""Create a valid JSON-RPC request.
Args:
methodname: The name of the remote method to invoke.
params: The parameters to pass to the remote method. This should be a
list or tuple and able to be encoded by the default JSON parser.
Returns:
A valid JSON-RPC request object.
"""
request = {
'jsonrpc': '2.0',
'method': methodname,
'params': params,
'id': ident
}
return request
def CreateRequestString(methodname, params, ident=''):
"""Create a valid JSON-RPC request string.
Args:
methodname: The name of the remote method to invoke.
params: The parameters to pass to the remote method.
These parameters need to be encode-able by the default JSON parser.
ident: The request identifier.
Returns:
A valid JSON-RPC request string.
"""
return json.dumps(CreateRequest(methodname, params, ident))
def CreateResponse(data, ident):
"""Create a JSON-RPC response.
Args:
data: The data to return.
ident: The response identifier.
Returns:
A valid JSON-RPC response object.
"""
if isinstance(data, Fault):
response = {
'jsonrpc': '2.0',
'error': {
'code': data.code,
'message': data.message},
'id': ident
}
else:
response = {
'jsonrpc': '2.0',
'response': data,
'id': ident
}
return response
def CreateResponseString(data, ident):
"""Create a JSON-RPC response string.
Args:
data: The data to return.
ident: The response identifier.
Returns:
A valid JSON-RPC response object.
"""
return json.dumps(CreateResponse(data, ident))
def ParseHTTPResponse(response):
"""Parse an HTTP response object and return the JSON object.
Args:
response: An HTTP response object.
Returns:
The returned JSON-RPC object.
Raises:
ProtocolError: if the object format is not correct.
Fault: If a Fault error is returned from the server.
"""
# Check for new http response object, else it is a file object
if hasattr(response, 'getheader'):
if response.getheader('Content-Encoding', '') == 'gzip':
stream = _base.GzipDecodedResponse(response)
else:
stream = response
else:
stream = response
data = ''
while 1:
chunk = stream.read(1024)
if not chunk:
break
data += chunk
response = json.loads(data)
ValidateBasicJSONRPCData(response)
if 'response' in response:
ValidateResponse(response)
return response['response']
elif 'error' in response:
ValidateError(response)
code = response['error']['code']
message = response['error']['message']
raise Fault(code, message)
else:
raise ProtocolError('No valid JSON returned')
def ValidateRequest(data):
"""Validate a JSON-RPC request object.
Args:
data: The JSON-RPC object (dict).
Raises:
ProtocolError: if the object format is not correct.
"""
ValidateBasicJSONRPCData(data)
if 'method' not in data or 'params' not in data:
raise ProtocolError('JSON is not a valid request')
def ValidateResponse(data):
"""Validate a JSON-RPC response object.
Args:
data: The JSON-RPC object (dict).
Raises:
ProtocolError: if the object format is not correct.
"""
ValidateBasicJSONRPCData(data)
if 'response' not in data:
raise ProtocolError('JSON is not a valid response')
def ValidateError(data):
"""Validate a JSON-RPC error object.
Args:
data: The JSON-RPC object (dict).
Raises:
ProtocolError: if the object format is not correct.
"""
ValidateBasicJSONRPCData(data)
if ('error' not in data or
'code' not in data['error'] or
'message' not in data['error']):
raise ProtocolError('JSON is not a valid error response')
def ValidateBasicJSONRPCData(data):
"""Validate a basic JSON-RPC object.
Args:
data: The JSON-RPC object (dict).
Raises:
ProtocolError: if the object format is not correct.
"""
error = None
if not isinstance(data, dict):
error = 'JSON data is not a dictionary'
elif 'jsonrpc' not in data or data['jsonrpc'] != '2.0':
error = 'JSON is not a valid JSON RPC 2.0 message'
elif 'id' not in data:
error = 'JSON data missing required id entry'
if error:
raise ProtocolError(error)
class Transport(_base.Transport):
"""RPC transport class.
This class extends the functionality of xmlrpclib.Transport and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
user_agent = 'jsonrpclib.py/' + __version__
def send_content(self, connection, request_body):
"""Send the request."""
connection.putheader('Content-Type','application/json')
#optionally encode the request
if (self.encode_threshold is not None and
self.encode_threshold < len(request_body) and
gzip):
connection.putheader('Content-Encoding', 'gzip')
request_body = gzip_encode(request_body)
connection.putheader('Content-Length', str(len(request_body)))
connection.endheaders(request_body)
def single_request(self, host, handler, request_body, verbose=0):
"""Issue a single JSON-RPC request."""
h = self.make_connection(host)
if verbose:
h.set_debuglevel(1)
try:
self.send_request(h, handler, request_body)
self.send_host(h, host)
self.send_user_agent(h)
self.send_content(h, request_body)
response = h.getresponse(buffering=True)
if response.status == 200:
self.verbose = verbose #pylint: disable=attribute-defined-outside-init
return self.parse_response(response)
except Fault:
raise
except Exception:
# All unexpected errors leave connection in
# a strange state, so we clear it.
self.close()
raise
# discard any response data and raise exception
if response.getheader('content-length', 0):
response.read()
raise ProtocolError(
host + handler,
response.status, response.reason,
response.msg,
)
def parse_response(self, response):
"""Parse the HTTP resoponse from the server."""
return ParseHTTPResponse(response)
class SafeTransport(_base.SafeTransport):
"""Transport class for HTTPS servers.
This class extends the functionality of xmlrpclib.SafeTransport and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
def parse_response(self, response):
return ParseHTTPResponse(response)
class ServerProxy(_base.ServerProxy):
"""Proxy class to the RPC server.
This class extends the functionality of xmlrpclib.ServerProxy and only
overrides the operations needed to change the protocol from XML-RPC to
JSON-RPC.
"""
def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=0, use_datetime=0):
urltype, _ = urllib.splittype(uri)
if urltype not in ('http', 'https'):
raise IOError('unsupported JSON-RPC protocol')
_base.ServerProxy.__init__(self, uri, transport, encoding, verbose,
allow_none, use_datetime)
transport_type, uri = urllib.splittype(uri)
if transport is None:
if transport_type == 'https':
transport = SafeTransport(use_datetime=use_datetime)
else:
transport = Transport(use_datetime=use_datetime)
self.__transport = transport
def __request(self, methodname, params):
"""Call a method on the remote server."""
request = CreateRequestString(methodname, params)
response = self.__transport.request(
self.__host,
self.__handler,
request,
verbose=self.__verbose
)
return response
Server = ServerProxy
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the task RPC methods."""
import logging
import os
import platform
import socket
import sys
import threading
from legion.lib import common_lib
from legion.lib import process
class RPCMethods(object):
"""Class exposing RPC methods."""
_dotted_whitelist = ['subprocess']
def __init__(self, server):
self._server = server
self.subprocess = process.Process
def _dispatch(self, method, params):
obj = self
if '.' in method:
# Allow only white listed dotted names
name, method = method.split('.')
assert name in self._dotted_whitelist
obj = getattr(self, name)
return getattr(obj, method)(*params)
def Echo(self, message):
"""Simple RPC method to print and return a message."""
logging.info('Echoing %s', message)
return 'echo %s' % str(message)
def AbsPath(self, path):
"""Returns the absolute path."""
return os.path.abspath(path)
def Quit(self):
"""Call _server.shutdown in another thread.
This is needed because server.shutdown waits for the server to actually
quit. However the server cannot shutdown until it completes handling this
call. Calling this in the same thread results in a deadlock.
"""
t = threading.Thread(target=self._server.shutdown)
t.start()
def GetOutputDir(self):
"""Returns the isolated output directory on the task machine."""
return common_lib.GetOutputDir()
def WriteFile(self, path, text, mode='wb+'):
"""Writes a file on the task machine."""
with open(path, mode) as fh:
fh.write(text)
def ReadFile(self, path, mode='rb'):
"""Reads a file from the local task machine."""
with open(path, mode) as fh:
return fh.read()
def PathJoin(self, *parts):
"""Performs an os.path.join on the task machine.
This is needed due to the fact that there is no guarantee that os.sep will
be the same across all machines in a particular test. This method will
join the path parts locally to ensure the correct separator is used.
"""
return os.path.join(*parts)
def ListDir(self, path):
"""Returns the results of os.listdir."""
return os.listdir(path)
def GetIpAddress(self):
"""Returns the local IPv4 address."""
return socket.gethostbyname(socket.gethostname())
def GetHostname(self):
"""Returns the hostname."""
return socket.gethostname()
def GetPlatform(self):
"""Returns the value of platform.platform()."""
return platform.platform()
def GetExecutable(self):
"""Returns the value of sys.executable."""
return sys.executable
def GetCwd(self):
"""Returns the value of os.getcwd()."""
return os.getcwd()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The task RPC server code.
This server is an XML-RPC server which serves code from
rpc_methods.RPCMethods.
This server will run until shutdown is called on the server object. This can
be achieved in 2 ways:
- Calling the Quit RPC method defined in RPCMethods
- Not receiving any calls within the idle_timeout_secs time.
"""
import logging
import threading
import time
import SocketServer
from legion.lib import common_lib
from legion.lib.rpc import jsonrpclib
from legion.lib.rpc import rpc_methods
from legion.lib.rpc import SimpleJSONRPCServer
class RequestHandler(SimpleJSONRPCServer.SimpleJSONRPCRequestHandler):
"""Restricts access to only specified IP address.
This call assumes the server is RPCServer.
"""
def do_POST(self):
"""Verifies the task is authorized to perform RPCs."""
if self.client_address[0] != self.server.authorized_address:
logging.error('Received unauthorized RPC request from %s',
self.task_address[0])
self.send_response(403)
response = 'Forbidden'
self.send_header('Content-type', 'text/plain')
self.send_header('Content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
else:
return SimpleJSONRPCServer.SimpleJSONRPCRequestHandler.do_POST(self)
class RpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer,
SocketServer.ThreadingMixIn):
"""Restricts all endpoints to only specified IP addresses."""
def __init__(self, authorized_address, port,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS):
SimpleJSONRPCServer.SimpleJSONRPCServer.__init__(
self, ('', port),
allow_none=True, logRequests=False,
requestHandler=RequestHandler)
self.authorized_address = authorized_address
self.idle_timeout_secs = idle_timeout_secs
self.register_instance(rpc_methods.RPCMethods(self))
self._shutdown_requested_event = threading.Event()
self._rpc_received_event = threading.Event()
self._idle_thread = threading.Thread(target=self._CheckForIdleQuit)
def shutdown(self):
"""Shutdown the server.
This overloaded method sets the _shutdown_requested_event to allow the
idle timeout thread to quit.
"""
self._shutdown_requested_event.set()
SimpleJSONRPCServer.SimpleJSONRPCServer.shutdown(self)
logging.info('Server shutdown complete')
def serve_forever(self, poll_interval=0.5):
"""Serve forever.
This overloaded method starts the idle timeout thread before calling
serve_forever. This ensures the idle timer thread doesn't get started
without the server running.
Args:
poll_interval: The interval to poll for shutdown.
"""
logging.info('RPC server starting')
self._idle_thread.start()
SimpleJSONRPCServer.SimpleJSONRPCServer.serve_forever(self, poll_interval)
def _dispatch(self, method, params):
"""Dispatch the call to the correct method with the provided params.
This overloaded method adds logging to help trace connection and
call problems.
Args:
method: The method name to call.
params: A tuple of parameters to pass.
Returns:
The result of the parent class' _dispatch method.
"""
logging.debug('Calling %s%s', method, params)
self._rpc_received_event.set()
return SimpleJSONRPCServer.SimpleJSONRPCServer._dispatch(
self, method, params)
def _CheckForIdleQuit(self):
"""Check for, and exit, if the server is idle for too long.
This method must be run in a separate thread to avoid a deadlock when
calling server.shutdown.
"""
timeout = time.time() + self.idle_timeout_secs
while time.time() < timeout:
if self._shutdown_requested_event.is_set():
# An external source called shutdown()
return
elif self._rpc_received_event.is_set():
logging.debug('Resetting the idle timeout')
timeout = time.time() + self.idle_timeout_secs
self._rpc_received_event.clear()
time.sleep(1)
# We timed out, kill the server
logging.warning('Shutting down the server due to the idle timeout')
self.shutdown()
@staticmethod
def Connect(server, port):
"""Creates and returns a connection to an RPC server."""
addr = 'http://%s:%d' % (server, port)
logging.debug('Connecting to RPC server at %s', addr)
return jsonrpclib.ServerProxy(addr, allow_none=True)
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility library to add SSL support to the RPC server."""
import logging
import ssl
import subprocess
import tempfile
from legion.lib import common_lib
from legion.lib.rpc import jsonrpclib
from legion.lib.rpc import SimpleJSONRPCServer
class Error(Exception):
pass
def CreateKeyFile():
"""Creates an SSL keyfile and returns the path."""
keyfile = tempfile.mkstemp()[1]
cmd = [
'openssl',
'genrsa',
'-out', keyfile,
'2048'
]
_RunCommand(cmd)
return keyfile
def CreateCsrFile(keyfile):
"""Creates an SSL CSR file and returns the path."""
csrfile = tempfile.mkstemp()[1]
cmd = [
'openssl',
'req',
'-new',
'-key', keyfile,
'-out', csrfile,
'-subj', '/C=NA/ST=NA/L=NA/O=Chromium/OU=Test/CN=chromium.org'
]
_RunCommand(cmd)
return csrfile
def CreateCrtFile(keyfile, csrfile):
"""Creates an SSL CRT file and returns the path."""
crtfile = tempfile.mkstemp()[1]
cmd = [
'openssl',
'x509',
'-req',
'-days', '1',
'-in', csrfile,
'-signkey', keyfile,
'-out', crtfile
]
_RunCommand(cmd)
return crtfile
def CreatePemFile():
"""Creates an SSL PEM file and returns the path."""
keyfile = CreateKeyFile()
csrfile = CreateCsrFile(keyfile)
crtfile = CreateCrtFile(keyfile, csrfile)
pemfile = tempfile.mkstemp()[1]
with open(keyfile) as k:
with open(crtfile) as c:
with open(pemfile, 'wb') as p:
p.write('%s\n%s' % (k.read(), c.read()))
return pemfile
def _RunCommand(cmd):
try:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError as e:
raise Error('Failed to run %s: %s' % (' '.join(cmd), e))
out, err = p.communicate()
if p.returncode != 0:
raise Error(err)
return out
class SslRpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer):
"""Class to add SSL support to the RPC server."""
def __init__(self, *args, **kwargs):
SimpleJSONRPCServer.SimpleJSONRPCServer.__init__(self, *args, **kwargs)
self.socket = ssl.wrap_socket(self.socket, certfile=CreatePemFile(),
server_side=True)
@staticmethod
def Connect(server, port=common_lib.SERVER_PORT):
"""Creates and returns a connection to an SSL RPC server."""
addr = 'https://%s:%d' % (server, port)
logging.debug('Connecting to RPC server at %s', addr)
return jsonrpclib.ServerProxy(addr, allow_none=True)
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the task controller library."""
import argparse
import datetime
import logging
import os
import socket
import subprocess
import sys
import threading
from legion.lib import common_lib
from legion.lib import process
from legion.lib.rpc import rpc_server
from legion.lib.rpc import jsonrpclib
ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
class Error(Exception):
pass
class ConnectionTimeoutError(Error):
pass
class TaskController(object):
"""Provisions, configures, and controls a task machine.
This class is an abstraction of a physical task machine. It provides an
end to end API for controlling a task machine. Operations on the task machine
are performed using the instance's "rpc" property. A simple end to end
scenario is as follows:
task = TaskController(...)
task.Create()
task.WaitForConnection()
proc = task.rpc.subprocess.Popen(['ls'])
print task.rpc.subprocess.GetStdout(proc)
task.Release()
"""
_task_count = 0
_tasks = []
def __init__(self, isolated_hash, dimensions, reg_server_port, priority=100,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
verbosity='ERROR', name=None, run_id=None):
assert isinstance(dimensions, dict)
type(self)._tasks.append(self)
type(self)._task_count += 1
self.verbosity = verbosity
self._name = name or 'Task%d' % type(self)._task_count
self._priority = priority
self._isolated_hash = isolated_hash
self._idle_timeout_secs = idle_timeout_secs
self._dimensions = dimensions
self._connect_event = threading.Event()
self._connected = False
self._ip_address = None
self._reg_server_port = reg_server_port
self._otp = self._CreateOTP()
self._rpc = None
self._output_dir = None
self._platform = None
self._executable = None
self._task_rpc_port = None
run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
self._task_name = '%s/%s/%s' % (
os.path.splitext(sys.argv[0])[0], self._name, run_id)
parser = argparse.ArgumentParser()
parser.add_argument('--isolate-server')
parser.add_argument('--swarming-server')
parser.add_argument('--task-connection-timeout-secs',
default=common_lib.DEFAULT_TIMEOUT_SECS)
args, _ = parser.parse_known_args()
self._isolate_server = args.isolate_server
self._swarming_server = args.swarming_server
self._connection_timeout_secs = (connection_timeout_secs or
args.task_connection_timeout_secs)
# Register for the shutdown event
common_lib.OnShutdown += self.Release
@property
def name(self):
return self._name
@property
def otp(self):
return self._otp
@property
def connected(self):
return self._connected
@property
def connect_event(self):
return self._connect_event
@property
def rpc(self):
return self._rpc
@property
def verbosity(self):
return self._verbosity
@verbosity.setter
def verbosity(self, level):
"""Sets the verbosity level as a string.
Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
logging.DEBUG, etc) is allowed.
"""
assert isinstance(level, (str, int))
if isinstance(level, int):
level = logging.getLevelName(level)
self._verbosity = level #pylint: disable=attribute-defined-outside-init
@property
def output_dir(self):
if not self._output_dir:
self._output_dir = self.rpc.GetOutputDir()
return self._output_dir
@property
def platform(self):
if not self._platform:
self._platform = self._rpc.GetPlatform()
return self._platform
@property
def ip_address(self):
if not self._ip_address:
self._ip_address = self.rpc.GetIpAddress()
return self._ip_address
@property
def executable(self):
if not self._executable:
self._executable = self.rpc.GetExecutable()
return self._executable
@classmethod
def ReleaseAllTasks(cls):
for task in cls._tasks:
task.Release()
def Process(self, cmd, *args, **kwargs):
return process.ControllerProcessWrapper(self.rpc, cmd, *args, **kwargs)
def _CreateOTP(self):
"""Creates the OTP."""
controller_name = socket.gethostname()
test_name = os.path.basename(sys.argv[0])
creation_time = datetime.datetime.utcnow()
otp = 'task:%s controller:%s port: %d test:%s creation:%s' % (
self._name, controller_name, self._reg_server_port, test_name,
creation_time)
return otp
def Create(self):
"""Creates the task machine."""
logging.info('Creating %s', self.name)
self._connect_event.clear()
self._ExecuteSwarming()
def WaitForConnection(self):
"""Waits for the task machine to connect.
Raises:
ConnectionTimeoutError if the task doesn't connect in time.
"""
logging.info('Waiting for %s to connect with a timeout of %d seconds',
self._name, self._connection_timeout_secs)
self._connect_event.wait(self._connection_timeout_secs)
if not self._connect_event.is_set():
raise ConnectionTimeoutError('%s failed to connect' % self.name)
def Release(self):
"""Quits the task's RPC server so it can release the machine."""
if self._rpc is not None and self._connected:
logging.info('Copying output-dir files to controller')
self.RetrieveOutputFiles()
logging.info('Releasing %s', self._name)
try:
self._rpc.Quit()
except (socket.error, jsonrpclib.Fault):
logging.error('Unable to connect to %s to call Quit', self.name)
self._rpc = None
self._connected = False
def _ExecuteSwarming(self):
"""Executes swarming.py."""
cmd = [
sys.executable,
SWARMING_PY,
'trigger',
self._isolated_hash,
'--priority', str(self._priority),
'--task-name', self._task_name,
]
if self._isolate_server:
cmd.extend(['--isolate-server', self._isolate_server])
if self._swarming_server:
cmd.extend(['--swarming', self._swarming_server])
for key, value in self._dimensions.iteritems():
cmd.extend(['--dimension', key, value])
cmd.extend([
'--',
'--controller', common_lib.MY_IP,
'--controller-port', str(self._reg_server_port),
'--otp', self._otp,
'--verbosity', self._verbosity,
'--idle-timeout', str(self._idle_timeout_secs),
'--output-dir', '${ISOLATED_OUTDIR}'
])
self._ExecuteProcess(cmd)
def _ExecuteProcess(self, cmd):
"""Executes a process, waits for it to complete, and checks for success."""
logging.debug('Running %s', ' '.join(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = p.communicate()
if p.returncode != 0:
raise Error(stderr)
def OnConnect(self, ip_address, rpc_port):
"""Receives task ip address and port on connection."""
self._ip_address = ip_address
self._task_rpc_port = rpc_port
self._connected = True
self._rpc = rpc_server.RpcServer.Connect(self._ip_address,
self._task_rpc_port)
logging.info('%s connected from %s:%s', self._name, ip_address,
self._task_rpc_port)
self._connect_event.set()
def RetrieveOutputFiles(self):
"""Retrieves all files in the output-dir."""
files = self.rpc.ListDir(self.output_dir)
for fname in files:
remote_path = self.rpc.PathJoin(self.output_dir, fname)
local_name = os.path.join(common_lib.GetOutputDir(),
'%s.%s' % (self.name, fname))
contents = self.rpc.ReadFile(remote_path)
with open(local_name, 'wb+') as fh:
fh.write(contents)
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The registration server used to register tasks.
The registration server is started by the test controller and allows the tasks
to register themselves when they start. Authentication of the tasks controllers
is based on an OTP passed to the run_task binary on startup.
"""
import logging
import threading
from legion.lib import common_lib
from legion.lib.rpc import SimpleJSONRPCServer
class TaskRegistrationServer(object):
"""Discovery server run on the host."""
def __init__(self):
self._expected_tasks = {}
self._rpc_server = None
self._thread = None
self._port = common_lib.GetUnusedPort()
@property
def port(self):
return self._port
def _RegisterTaskRPC(self, otp, ip, port):
"""The RPC used by a task to register with the registration server."""
assert otp in self._expected_tasks
cb = self._expected_tasks.pop(otp)
cb(ip, port)
def RegisterTaskCallback(self, otp, callback):
"""Registers a callback associated with an OTP."""
assert callable(callback)
self._expected_tasks[otp] = callback
def Start(self):
"""Starts the registration server."""
logging.info('Starting task registration server')
self._rpc_server = SimpleJSONRPCServer.SimpleJSONRPCServer(
('', self._port), allow_none=True, logRequests=False)
self._rpc_server.register_function(
self._RegisterTaskRPC, 'RegisterTask')
self._thread = threading.Thread(target=self._rpc_server.serve_forever)
self._thread.start()
def Shutdown(self):
"""Shuts the discovery server down."""
if self._thread and self._thread.is_alive():
logging.info('Shutting down task registration server')
self._rpc_server.shutdown()
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The main task entrypoint."""
import argparse
import logging
import socket
import sys
import time
# pylint: disable=relative-import
# Import common_lib first so we can setup the environment
from lib import common_lib
common_lib.SetupEnvironment()
from legion.lib.rpc import rpc_server
def main():
print ' '.join(sys.argv)
common_lib.InitLogging()
logging.info('Task starting')
parser = argparse.ArgumentParser()
parser.add_argument('--otp',
help='One time token used to authenticate with the host')
parser.add_argument('--controller',
help='The ip address of the controller machine')
parser.add_argument('--controller-port', type=int,
help='The port the controllers registration server is on')
parser.add_argument('--idle-timeout', type=int,
default=common_lib.DEFAULT_TIMEOUT_SECS,
help='The idle timeout for the rpc server in seconds')
args, _ = parser.parse_known_args()
my_port = common_lib.GetUnusedPort()
logging.info(
'Registering with registration server at %s:%d using OTP "%s"',
args.controller, args.controller_port, args.otp)
rpc_server.RpcServer.Connect(
args.controller, args.controller_port).RegisterTask(
args.otp, common_lib.MY_IP, my_port)
server = rpc_server.RpcServer(args.controller, my_port, args.idle_timeout)
server.serve_forever()
logging.info('Server shutdown complete')
return 0
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A helper module to run Legion multi-machine tests.
Example usage with 1 task machine:
$ testing/legion/tools/legion.py run \
--controller-isolated out/Release/example_test_controller.isolated \
--dimension os Ubuntu-14.04 \
--task-name test-task-name \
--task task_machine out/Release/example_task_machine.isolated
Example usage with 2 task machines with the same isolated file:
$ testing/legion/tools/legion.py run \
--controller-isolated out/Release/example_test_controller.isolated \
--dimension os Ubuntu-14.04 \
--task-name test-task-name \
--task task_machine_1 out/Release/example_task_machine.isolated \
--task task_machine_2 out/Release/example_task_machine.isolated
Example usage with 2 task machines with different isolated file:
$ testing/legion/tools/legion.py run \
--controller-isolated out/Release/example_test_controller.isolated \
--dimension os Ubuntu-14.04 \
--task-name test-task-name \
--task task_machine_1 out/Release/example_task_machine_1.isolated \
--task task_machine_2 out/Release/example_task_machine_2.isolated
"""
import argparse
import logging
import os
import subprocess
import sys
THIS_DIR = os.path.split(__file__)[0]
SWARMING_DIR = os.path.join(THIS_DIR, '..', '..', '..', 'tools',
'swarming_client')
ISOLATE_PY = os.path.join(SWARMING_DIR, 'isolate.py')
SWARMING_PY = os.path.join(SWARMING_DIR, 'swarming.py')
LOGGING_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'ERROR']
class Error(Exception):
pass
class ArgumentError(Error):
pass
def GetArgs(cmd_args):
default_dimension = ['pool', 'Legion']
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('action', choices=['run', 'trigger'],
help='The swarming action to perform.')
parser.add_argument('-f', '--format-only', action='store_true',
help='If true the .isolated files are archived but '
'swarming is not called, only the command line is built.')
parser.add_argument('--controller-isolated', required=True,
help='The isolated file for the test controller.')
parser.add_argument('--isolate-server', help='Optional. The isolated server '
'to use.', default=os.environ.get('ISOLATE_SERVER', ''))
parser.add_argument('--swarming-server', help='Optional. The swarming server '
'to use.', default=os.environ.get('SWARMING_SERVER', ''))
parser.add_argument('--task-name', help='Optional. The swarming task name '
'to use.')
parser.add_argument('--dimension', action='append', dest='dimensions',
nargs=2, default=default_dimension,
help='Dimensions to pass to '
'swarming.py. This is in the form of --dimension key '
'value. The minimum required is --dimension os <OS>')
parser.add_argument('--task', action='append', dest='tasks',
nargs=2, default=[], help='List of task names used in '
'the test controller. This is in the form of --task name '
'.isolated and is passed to the controller as --name '
'<ISOLATED HASH>.')
parser.add_argument('--controller-var', action='append',
dest='controller_vars', nargs=2, default=[],
help='Command line vars to pass to the controller. These '
'are in the form of --controller-var name value and are '
'passed to the controller as --name value.')
parser.add_argument('-v', '--verbosity', default=0, action='count')
return parser.parse_args(cmd_args)
def RunCommand(cmd, stream_stdout=False):
"""Runs the command line and streams stdout if requested."""
kwargs = {
'args': cmd,
'stderr': subprocess.PIPE,
}
if not stream_stdout:
kwargs['stdout'] = subprocess.PIPE
p = subprocess.Popen(**kwargs)
stdout, stderr = p.communicate()
if p.returncode:
raise Error(stderr)
if not stream_stdout:
logging.debug(stdout)
return stdout
def Archive(isolated, isolate_server):
"""Calls isolate.py archive with the given args."""
cmd = [
sys.executable,
ISOLATE_PY,
'archive',
'--isolated', isolated,
]
cmd.extend(['--isolate-server', isolate_server])
print ' '.join(cmd)
return RunCommand(cmd).split()[0] # The isolated hash
def GetSwarmingCommandLine(args, extra_args):
"""Builds and returns the command line for swarming.py run|trigger."""
cmd = [
sys.executable,
SWARMING_PY,
args.action,
args.controller_isolated,
]
cmd.extend(['--isolate-server', args.isolate_server])
cmd.extend(['--swarming', args.swarming_server])
if args.task_name:
cmd.extend(['--task-name', args.task_name])
# swarming.py dimensions
for name, value in args.dimensions:
cmd.extend(['--dimension', name, value])
cmd.append('--')
cmd.extend(extra_args)
cmd.extend(['--swarming-server', args.swarming_server])
cmd.extend(['--isolate-server', args.isolate_server])
# Specify the output dir
cmd.extend(['--output-dir', '${ISOLATED_OUTDIR}'])
# Task name/hash values
for name, isolated in args.tasks:
if args.format_only:
cmd.extend(['--' + name, isolated + '_test_only'])
else:
cmd.extend(['--' + name, Archive(isolated, args.isolate_server)])
# Test controller args
for name, value in args.controller_vars:
cmd.extend(['--' + name, value])
print ' '.join(cmd)
return cmd
def main():
if '--' not in sys.argv:
cmd_args = sys.argv[1:]
extra_args = []
else:
index = sys.argv.index('--')
cmd_args = sys.argv[1:index]
extra_args = sys.argv[index+1:]
args = GetArgs(cmd_args)
if not args.swarming_server:
raise ArgumentError('Missing required argument: --swarming-server')
if not args.isolate_server:
raise ArgumentError('Missing required argument: --isolate-server')
logging.basicConfig(
format='%(asctime)s %(filename)s:%(lineno)s %(levelname)s] %(message)s',
datefmt='%H:%M:%S',
level=LOGGING_LEVELS[len(LOGGING_LEVELS)-args.verbosity-1])
cmd = GetSwarmingCommandLine(args, extra_args)
if not args.format_only:
RunCommand(cmd, True)
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Local unittest for legion.lib.comm_server."""
import httplib
# pylint: disable=relative-import
import legion_unittest
from legion.lib.comm_server import comm_server
class CommServerTest(legion_unittest.TestCase):
def setUp(self):
super(CommServerTest, self).setUp()
self.server = comm_server.CommServer()
self.server.start()
def tearDown(self):
try:
self.server.shutdown()
finally:
super(CommServerTest, self).tearDown()
def Connect(self, verb, path, message=''):
conn = httplib.HTTPConnection('localhost', self.server.port)
conn.request(verb, path, body=message)
return conn.getresponse()
def testMessagesUsedAsSignals(self):
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 404)
self.assertEquals(
self.Connect('PUT', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message1').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message1').status, 404)
self.assertEquals(
self.Connect('GET', '/messages/message1').status, 404)
def testErrors(self):
for verb in ['GET', 'PUT', 'DELETE']:
self.assertEquals(
self.Connect(verb, '/').status, 403)
self.assertEquals(
self.Connect(verb, '/foobar').status, 403)
self.assertEquals(
self.Connect(verb, '/foobar/').status, 405)
def testMessagePassing(self):
self.assertEquals(
self.Connect('GET', '/messages/message2').status, 404)
self.assertEquals(
self.Connect('PUT', '/messages/message2', 'foo').status, 200)
self.assertEquals(
self.Connect('GET', '/messages/message2').read(), 'foo')
self.assertEquals(
self.Connect('DELETE', '/messages/message2').status, 200)
self.assertEquals(
self.Connect('DELETE', '/messages/message2').status, 404)
self.assertEquals(
self.Connect('GET', '/messages/message2').status, 404)
if __name__ == '__main__':
legion_unittest.main()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Base test class for Legion-specific unittests.
Currently this module is only needed to setup the import paths for the
unittests. This will allow unittests to use the import format:
from legion.foo import bar
Using this base class for all unittests allows for easier extensibility in
the future.
"""
import os
import sys
import unittest
# Setup import paths
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
LEGION_IMPORT_FIX = os.path.join(THIS_DIR, '..', '..')
sys.path.append(LEGION_IMPORT_FIX)
class TestCase(unittest.TestCase):
pass
def main():
unittest.main(verbosity=0, argv=sys.argv[:1])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment