Commit abebda49 authored by mmeade's avatar mmeade Committed by Commit bot

Adding the initial code for Omnibot multi-machine support.

This cl contains the base code plus a "simple" hello world example test.

BUG=453679

Review URL: https://codereview.chromium.org/890773003

Cr-Commit-Position: refs/heads/master@{#315080}
parent 809e0233
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The main client_controller code.
This code is the main entry point for the client machines and handles
registering with the host server and running the local RPC server.
"""
import argparse
import logging
import socket
import sys
import time
#pylint: disable=relative-import
import client_rpc_server
import common_lib
def main():
print ' '.join(sys.argv)
common_lib.InitLogging()
logging.info('Client controller starting')
parser = argparse.ArgumentParser()
parser.add_argument('--otp',
help='One time token used to authenticate with the host')
parser.add_argument('--host',
help='The ip address of the host')
parser.add_argument('--idle-timeout', type=int,
default=common_lib.DEFAULT_TIMEOUT_SECS,
help='The idle timeout for the rpc server in seconds')
args, _ = parser.parse_known_args()
logging.info(
'Registering with discovery server at %s using OTP %s', args.host,
args.otp)
server = common_lib.ConnectToServer(args.host).RegisterClient(
args.otp, common_lib.MY_IP)
server = client_rpc_server.RPCServer(args.host, args.idle_timeout)
server.serve_forever()
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the client library."""
import argparse
import datetime
import logging
import os
import socket
import subprocess
import sys
import tempfile
import threading
import xmlrpclib
#pylint: disable=relative-import
import common_lib
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
SWARMING_DIR = os.path.join(THIS_DIR, '..', '..', 'tools/swarming_client')
ISOLATE_PY = os.path.join(SWARMING_DIR, 'isolate.py')
SWARMING_PY = os.path.join(SWARMING_DIR, 'swarming.py')
class Error(Exception):
pass
class ConnectionTimeoutError(Error):
pass
class ClientController(object):
"""Creates, configures, and controls a client machine."""
_client_count = 0
_controllers = []
def __init__(self, isolate_file, config_vars, dimensions, priority=100,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
verbosity='ERROR', name=None):
assert isinstance(config_vars, dict)
assert isinstance(dimensions, dict)
type(self)._controllers.append(self)
type(self)._client_count += 1
self.verbosity = verbosity
self._name = name or 'Client%d' % type(self)._client_count
self._priority = priority
self._isolate_file = isolate_file
self._isolated_file = isolate_file + 'd'
self._idle_timeout_secs = idle_timeout_secs
self._config_vars = config_vars
self._dimensions = dimensions
self._connect_event = threading.Event()
self._connected = False
self._ip_address = None
self._otp = self._CreateOTP()
self._rpc = None
parser = argparse.ArgumentParser()
parser.add_argument('--isolate-server')
parser.add_argument('--swarming-server')
parser.add_argument('--client-connection-timeout-secs',
default=common_lib.DEFAULT_TIMEOUT_SECS)
args, _ = parser.parse_known_args()
self._isolate_server = args.isolate_server
self._swarming_server = args.swarming_server
self._connection_timeout_secs = (connection_timeout_secs or
args.client_connection_timeout_secs)
@property
def name(self):
return self._name
@property
def otp(self):
return self._otp
@property
def connected(self):
return self._connected
@property
def connect_event(self):
return self._connect_event
@property
def rpc(self):
return self._rpc
@property
def verbosity(self):
return self._verbosity
@verbosity.setter
def verbosity(self, level):
"""Sets the verbosity level as a string.
Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
logging.DEBUG, etc) is allowed.
"""
assert isinstance(level, (str, int))
if isinstance(level, int):
level = logging.getLevelName(level)
self._verbosity = level #pylint: disable=attribute-defined-outside-init
@classmethod
def ReleaseAllControllers(cls):
for controller in cls._controllers:
controller.Release()
def _CreateOTP(self):
"""Creates the OTP."""
host_name = socket.gethostname()
test_name = os.path.basename(sys.argv[0])
creation_time = datetime.datetime.utcnow()
otp = 'client:%s-host:%s-test:%s-creation:%s' % (
self._name, host_name, test_name, creation_time)
return otp
def Create(self):
"""Creates the client machine."""
logging.info('Creating %s', self.name)
self._connect_event.clear()
self._ExecuteIsolate()
self._ExecuteSwarming()
def WaitForConnection(self):
"""Waits for the client machine to connect.
Raises:
ConnectionTimeoutError if the client doesn't connect in time.
"""
logging.info('Waiting for %s to connect with a timeout of %d seconds',
self._name, self._connection_timeout_secs)
self._connect_event.wait(self._connection_timeout_secs)
if not self._connect_event.is_set():
raise ConnectionTimeoutError('%s failed to connect' % self.name)
def Release(self):
"""Quits the client's RPC server so it can release the machine."""
if self._rpc is not None and self._connected:
logging.info('Releasing %s', self._name)
try:
self._rpc.Quit()
except (socket.error, xmlrpclib.Fault):
logging.error('Unable to connect to %s to call Quit', self.name)
self._rpc = None
self._connected = False
def _ExecuteIsolate(self):
"""Executes isolate.py."""
cmd = [
'python',
ISOLATE_PY,
'archive',
'--isolate', self._isolate_file,
'--isolated', self._isolated_file,
]
if self._isolate_server:
cmd.extend(['--isolate-server', self._isolate_server])
for key, value in self._config_vars.iteritems():
cmd.extend(['--config-var', key, value])
self._ExecuteProcess(cmd)
def _ExecuteSwarming(self):
"""Executes swarming.py."""
cmd = [
'python',
SWARMING_PY,
'trigger',
self._isolated_file,
'--priority', str(self._priority),
]
if self._isolate_server:
cmd.extend(['--isolate-server', self._isolate_server])
if self._swarming_server:
cmd.extend(['--swarming', self._swarming_server])
for key, value in self._dimensions.iteritems():
cmd.extend(['--dimension', key, value])
cmd.extend([
'--',
'--host', common_lib.MY_IP,
'--otp', self._otp,
'--verbosity', self._verbosity,
'--idle-timeout', str(self._idle_timeout_secs),
])
self._ExecuteProcess(cmd)
def _ExecuteProcess(self, cmd):
"""Executes a process, waits for it to complete, and checks for success."""
logging.debug('Running %s', ' '.join(cmd))
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_, stderr = p.communicate()
if p.returncode != 0:
stderr.seek(0)
raise Error(stderr)
def OnConnect(self, ip_address):
"""Receives client ip address on connection."""
self._ip_address = ip_address
self._connected = True
self._rpc = common_lib.ConnectToServer(self._ip_address)
logging.info('%s connected from %s', self._name, ip_address)
self._connect_event.set()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the client RPC methods."""
import logging
import subprocess
import threading
class RPCMethods(object):
"""Class exposing RPC methods."""
def __init__(self, server):
self.server = server
def Echo(self, message):
"""Simple RPC method to print and return a message."""
logging.info('Echoing %s', message)
return 'echo %s' % str(message)
def Subprocess(self, cmd):
"""Run the commands in a subprocess.
Returns:
(returncode, stdout, stderr).
"""
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
return (p.returncode, stdout, stderr)
def Quit(self):
"""Call server.shutdown in another thread.
This is needed because server.shutdown waits for the server to actually
quit. However the server cannot shutdown until it completes handling this
call. Calling this in the same thread results in a deadlock.
"""
t = threading.Thread(target=self.server.shutdown)
t.start()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The client RPC server code.
This server is an XML-RPC server which serves code from
client_rpc_methods.RPCMethods.
This server will run until shutdown is called on the server object. This can
be achieved in 2 ways:
- Calling the Quit RPC method defined in RPCMethods
- Not receiving any calls within the idle_timeout_secs time.
"""
import logging
import threading
import time
import xmlrpclib
import SimpleXMLRPCServer
import SocketServer
#pylint: disable=relative-import
import client_rpc_methods
import common_lib
class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
"""Restricts access to only specified IP address.
This call assumes the server is RPCServer.
"""
def do_POST(self):
"""Verifies the client is authorized to perform RPCs."""
if self.client_address[0] != self.server.authorized_address:
logging.error('Received unauthorized RPC request from %s',
self.client_address[0])
self.send_response(403)
response = 'Forbidden'
self.send_header('Content-type', 'text/plain')
self.send_header('Content-length', str(len(response)))
self.end_headers()
self.wfile.write(response)
else:
return SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
class RPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer,
SocketServer.ThreadingMixIn):
"""Restricts all endpoints to only specified IP addresses."""
def __init__(self, authorized_address,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS):
SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(
self, (common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT),
allow_none=True, logRequests=False,
requestHandler=RequestHandler)
self.authorized_address = authorized_address
self.idle_timeout_secs = idle_timeout_secs
self.register_instance(client_rpc_methods.RPCMethods(self))
self._shutdown_requested_event = threading.Event()
self._rpc_received_event = threading.Event()
self._idle_thread = threading.Thread(target=self._CheckForIdleQuit)
def shutdown(self):
"""Shutdown the server.
This overloaded method sets the _shutdown_requested_event to allow the
idle timeout thread to quit.
"""
self._shutdown_requested_event.set()
SimpleXMLRPCServer.SimpleXMLRPCServer.shutdown(self)
logging.info('Server shutdown complete')
def serve_forever(self, poll_interval=0.5):
"""Serve forever.
This overloaded method starts the idle timeout thread before calling
serve_forever. This ensures the idle timer thread doesn't get started
without the server running.
Args:
poll_interval: The interval to poll for shutdown.
"""
logging.info('RPC server starting')
self._idle_thread.start()
SimpleXMLRPCServer.SimpleXMLRPCServer.serve_forever(self, poll_interval)
def _dispatch(self, method, params):
"""Dispatch the call to the correct method with the provided params.
This overloaded method adds logging to help trace connection and
call problems.
Args:
method: The method name to call.
params: A tuple of parameters to pass.
Returns:
The result of the parent class' _dispatch method.
"""
logging.debug('Calling %s%s', method, params)
self._rpc_received_event.set()
return SimpleXMLRPCServer.SimpleXMLRPCServer._dispatch(self, method, params)
def _CheckForIdleQuit(self):
"""Check for, and exit, if the server is idle for too long.
This method must be run in a separate thread to avoid a deadlock when
calling server.shutdown.
"""
timeout = time.time() + self.idle_timeout_secs
while time.time() < timeout:
if self._shutdown_requested_event.is_set():
# An external source called shutdown()
return
elif self._rpc_received_event.is_set():
logging.debug('Resetting the idle timeout')
timeout = time.time() + self.idle_timeout_secs
self._rpc_received_event.clear()
time.sleep(1)
# We timed out, kill the server
logging.warning('Shutting down the server due to the idle timeout')
self.shutdown()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common library methods used by both host and client controllers."""
import argparse
import logging
import socket
import xmlrpclib
LOGGING_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'WARN', 'ERROR']
MY_IP = socket.gethostbyname(socket.gethostname())
SERVER_ADDRESS = ''
SERVER_PORT = 31710
DEFAULT_TIMEOUT_SECS = 20 * 60 # 30 minutes
def InitLogging():
"""Initialize the logging module.
Raises:
argparse.ArgumentError if the --verbosity arg is incorrect.
"""
parser = argparse.ArgumentParser()
logging_action = parser.add_argument('--verbosity', default='ERROR')
args, _ = parser.parse_known_args()
if args.verbosity not in LOGGING_LEVELS:
raise argparse.ArgumentError(
logging_action, 'Only levels %s supported' % str(LOGGING_LEVELS))
logging.basicConfig(
format='%(asctime)s %(filename)s:%(lineno)s %(levelname)s] %(message)s',
datefmt='%H:%M:%S', level=args.verbosity)
def ConnectToServer(server):
"""Connect to an RPC server."""
addr = 'http://%s:%d' % (server, SERVER_PORT)
logging.debug('Connecting to RPC server at %s', addr)
return xmlrpclib.Server(addr)
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The discovery server used to register clients.
The discovery server is started by the host controller and allows the clients
to register themselves when they start. Authentication of the client controllers
is based on an OTP passed to the client controller binary on startup.
"""
import logging
import threading
import xmlrpclib
import SimpleXMLRPCServer
#pylint: disable=relative-import
import common_lib
class DiscoveryServer(object):
"""Discovery server run on the host."""
def __init__(self):
self._expected_clients = {}
self._rpc_server = None
self._thread = None
def _RegisterClientRPC(self, otp, ip):
"""The RPC used by a client to register with the discovery server."""
assert otp in self._expected_clients
cb = self._expected_clients.pop(otp)
cb(ip)
def RegisterClientCallback(self, otp, callback):
"""Registers a callback associated with an OTP."""
assert callable(callback)
self._expected_clients[otp] = callback
def Start(self):
"""Starts the discovery server."""
logging.debug('Starting discovery server')
self._rpc_server = SimpleXMLRPCServer.SimpleXMLRPCServer(
(common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT),
allow_none=True, logRequests=False)
self._rpc_server.register_function(
self._RegisterClientRPC, 'RegisterClient')
self._thread = threading.Thread(target=self._rpc_server.serve_forever)
self._thread.start()
def Shutdown(self):
"""Shuts the discovery server down."""
if self._thread and self._thread.is_alive():
logging.debug('Shutting down discovery server')
self._rpc_server.shutdown()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
{
'includes': [
'../../legion.isolate'
],
'conditions': [
['multi_machine == 1', {
'variables': {
'command': [
'python',
'../../client_controller.py',
],
'files': [
'client_test.py',
'client_test.isolate'
],
},
}],
],
}
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A simple client test module.
This module is invoked by the host by calling the client controller's
Subprocess RPC method. The name is passed in as a required argument on the
command line.
"""
import argparse
import os
import sys
def main():
parser = argparse.ArgumentParser()
parser.add_argument('name')
args = parser.parse_args()
print 'Hello world from', args.name
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
{
'includes': [
'../../legion.isolate',
'client_test.isolate'
],
'conditions': [
['multi_machine == 1', {
'variables': {
'command': [
'host_test.py',
],
'files': [
'host_test.py',
],
},
}],
]
}
#!/usr/bin/env python
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A simple host test module.
This module runs on the host machine and is responsible for creating 2
client machines, waiting for them, and running RPC calls on them.
"""
# Map the legion directory so we can import the host controller.
import sys
sys.path.append('../../')
import logging
import time
import host_controller
class ExampleController(host_controller.HostController):
"""A simple example controller for a test."""
def __init__(self):
super(ExampleController, self).__init__()
self.client1 = None
self.client2 = None
def CreateClient(self):
"""Create a client object and set the proper values."""
client = self.NewClient(
isolate_file='client_test.isolate',
config_vars={'multi_machine': '1'},
dimensions={'os': 'Linux', 'pool': 'legion'}, priority=200,
idle_timeout_secs=90, connection_timeout_secs=90,
verbosity=logging.INFO)
client.Create()
return client
def SetUp(self):
"""Create the client machines and wait until they connect.
In this call the actual creation of the client machines is done in parallel
by the system. The WaitForConnect calls are performed in series but will
return as soon as the clients connect.
"""
self.client1 = self.CreateClient()
self.client2 = self.CreateClient()
self.client1.WaitForConnection()
self.client2.WaitForConnection()
def Task(self):
"""Main method to run the task code."""
self.CallEcho(self.client1)
self.CallEcho(self.client2)
self.CallClientTest(self.client1)
self.CallClientTest(self.client2)
def CallEcho(self, client):
"""Call rpc.Echo on a client."""
logging.info('Calling Echo on %s', client.name)
logging.info(self.client1.rpc.Echo(client.name))
def CallClientTest(self, client):
"""Call client_test.py name on a client."""
logging.info('Calling Subprocess to run "./client_test.py %s"', client.name)
retcode, stdout, stderr = client.rpc.Subprocess(
['./client_test.py', client.name])
logging.info('retcode: %s, stdout: %s, stderr: %s', retcode, stdout, stderr)
if __name__ == '__main__':
ExampleController().RunController()
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the host controller base library.
This module is the basis on which host controllers are built and executed.
"""
import logging
import sys
#pylint: disable=relative-import
import client_lib
import common_lib
import discovery_server
class HostController(object):
"""The base host controller class."""
def __init__(self):
self._discovery_server = discovery_server.DiscoveryServer()
def SetUp(self):
"""Setup method used by the subclass."""
pass
def Task(self):
"""Main task method used by the subclass."""
pass
def TearDown(self):
"""Teardown method used by the subclass."""
pass
def NewClient(self, *args, **kwargs):
controller = client_lib.ClientController(*args, **kwargs)
self._discovery_server.RegisterClientCallback(
controller.otp, controller.OnConnect)
return controller
def RunController(self):
"""Main entry point for the controller."""
print ' '.join(sys.argv)
common_lib.InitLogging()
self._discovery_server.Start()
error = None
tb = None
try:
self.SetUp()
self.Task()
except Exception as e:
# Defer raising exceptions until after TearDown and _TearDown are called.
error = e
tb = sys.exc_info()[-1]
try:
self.TearDown()
except Exception as e:
# Defer raising exceptions until after _TearDown is called.
# Note that an error raised here will obscure any errors raised
# previously.
error = e
tb = sys.exc_info()[-1]
self._discovery_server.Shutdown()
client_lib.ClientController.ReleaseAllControllers()
if error:
raise error, None, tb #pylint: disable=raising-bad-type
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
{
'variables': {
'files': [
'__init__.py',
'client_controller.py',
'client_lib.py',
'client_rpc_methods.py',
'client_rpc_server.py',
'common_lib.py',
'discovery_server.py',
'host_controller.py',
'legion.isolate',
'../../tools/swarming_client/',
],
},
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment