Commit ca0e3774 authored by Mikhail Khokhlov's avatar Mikhail Khokhlov Committed by Commit Bot

[tools/perf] Factor out common code into util.ApplyInParallel

Bug: 981349
Change-Id: I3a9231abd80cfb9098ac3b8e7d261e70547a0c88
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1852245Reviewed-by: default avatarJuan Antonio Navarro Pérez <perezju@chromium.org>
Commit-Queue: Mikhail Khokhlov <khokhlov@google.com>
Cr-Commit-Position: refs/heads/master@{#704573}
parent 7f8e26a2
...@@ -25,43 +25,35 @@ HISTOGRAM_DICTS_FILE = 'histogram_dicts.json' ...@@ -25,43 +25,35 @@ HISTOGRAM_DICTS_FILE = 'histogram_dicts.json'
def _PoolWorker(test_result): def _PoolWorker(test_result):
try: metrics = [tag['value'] for tag in test_result['tags']
metrics = [tag['value'] for tag in test_result['tags'] if tag['key'] == 'tbmv2']
if tag['key'] == 'tbmv2'] html_trace = test_result['outputArtifacts'][HTML_TRACE_NAME]
html_trace = test_result['outputArtifacts'][HTML_TRACE_NAME] html_local_path = html_trace['filePath']
html_local_path = html_trace['filePath'] html_remote_url = html_trace.get('remoteUrl')
html_remote_url = html_trace['remoteUrl']
logging.info('%s: Starting to compute metrics on trace.',
logging.info('%s: Starting to compute metrics on trace.', test_result['testPath'])
test_result['testPath']) start = time.time()
start = time.time() # The timeout needs to be coordinated with the Swarming IO timeout for the
# The timeout needs to be coordinated with the Swarming IO timeout for the # task that runs this code. If this timeout is longer or close in length
# task that runs this code. If this timeout is longer or close in length # to the swarming IO timeout then we risk being forcibly killed for not
# to the swarming IO timeout then we risk being forcibly killed for not # producing any output. Note that this could be fixed by periodically
# producing any output. Note that this could be fixed by periodically # outputting logs while waiting for metrics to be calculated.
# outputting logs while waiting for metrics to be calculated. TEN_MINUTES = 60 * 10
TEN_MINUTES = 60 * 10 mre_result = metric_runner.RunMetricOnSingleTrace(
mre_result = metric_runner.RunMetricOnSingleTrace( html_local_path, metrics, canonical_url=html_remote_url,
html_local_path, metrics, canonical_url=html_remote_url, timeout=TEN_MINUTES,
timeout=TEN_MINUTES, extra_import_options={'trackDetailedModelStats': True})
extra_import_options={'trackDetailedModelStats': True}) logging.info('%s: Computing metrics took %.3f seconds.' % (
logging.info('%s: Computing metrics took %.3f seconds.' % ( test_result['testPath'], time.time() - start))
test_result['testPath'], time.time() - start))
if mre_result.failures:
if mre_result.failures: test_result['status'] = 'FAIL'
test_result['status'] = 'FAIL' for f in mre_result.failures:
for f in mre_result.failures: logging.error('Failure recorded for test %s: %s',
logging.error('Failure recorded for test %s: %s', test_result['testPath'], f)
test_result['testPath'], f)
return mre_result.pairs.get('histograms', [])
return mre_result.pairs.get('histograms', [])
except Exception: # pylint: disable=broad-except
# logging exception here is the only way to get a stack trace since
# multiprocessing's pool implementation does not save that data. See
# crbug.com/953365.
logging.exception('%s: Exception while calculating metric' %
test_result['testPath'])
raise
def ComputeTBMv2Metrics(intermediate_results): def ComputeTBMv2Metrics(intermediate_results):
...@@ -104,9 +96,6 @@ def ComputeTBMv2Metrics(intermediate_results): ...@@ -104,9 +96,6 @@ def ComputeTBMv2Metrics(intermediate_results):
work_list.append(test_result) work_list.append(test_result)
if not work_list:
return histogram_dicts
for dicts in util.ApplyInParallel(_PoolWorker, work_list): for dicts in util.ApplyInParallel(_PoolWorker, work_list):
histogram_dicts += dicts histogram_dicts += dicts
......
...@@ -155,9 +155,6 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label): ...@@ -155,9 +155,6 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label):
remote_name = '/'.join([run_identifier, result['testPath'], name]) remote_name = '/'.join([run_identifier, result['testPath'], name])
work_list.append((artifact, remote_name)) work_list.append((artifact, remote_name))
if not work_list:
return
def PoolUploader(work_item): def PoolUploader(work_item):
artifact, remote_name = work_item artifact, remote_name = work_item
artifact['remoteUrl'] = cloud_storage.Insert( artifact['remoteUrl'] = cloud_storage.Insert(
......
...@@ -18,6 +18,9 @@ def ApplyInParallel(function, work_list): ...@@ -18,6 +18,9 @@ def ApplyInParallel(function, work_list):
A generator over results. The order of results might not match the A generator over results. The order of results might not match the
order of the arguments in the work_list. order of the arguments in the work_list.
""" """
if not work_list:
return
try: try:
# Note that this is speculatively halved as an attempt to fix # Note that this is speculatively halved as an attempt to fix
# crbug.com/953365. # crbug.com/953365.
...@@ -28,8 +31,18 @@ def ApplyInParallel(function, work_list): ...@@ -28,8 +31,18 @@ def ApplyInParallel(function, work_list):
cpu_count = 4 cpu_count = 4
pool = ThreadPool(min(cpu_count, len(work_list))) pool = ThreadPool(min(cpu_count, len(work_list)))
def function_with_try(arg):
try:
return function(arg)
except Exception: # pylint: disable=broad-except
# logging exception here is the only way to get a stack trace since
# multiprocessing's pool implementation does not save that data. See
# crbug.com/953365.
logging.exception('Exception while running %s' % function.__name__)
raise
try: try:
for result in pool.imap_unordered(function, work_list): for result in pool.imap_unordered(function_with_try, work_list):
yield result yield result
pool.close() pool.close()
pool.join() pool.join()
......
...@@ -13,3 +13,11 @@ class UtilTests(unittest.TestCase): ...@@ -13,3 +13,11 @@ class UtilTests(unittest.TestCase):
fun = lambda x: x * x fun = lambda x: x * x
result = set(util.ApplyInParallel(fun, work_list)) result = set(util.ApplyInParallel(fun, work_list))
self.assertEqual(result, set([1, 4, 9])) self.assertEqual(result, set([1, 4, 9]))
def testApplyInParallelExceptionRaised(self):
work_list = [1, 2, 3]
def fun(x):
if x == 3:
raise RuntimeError()
with self.assertRaises(RuntimeError):
list(util.ApplyInParallel(fun, work_list))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment