Commit 86308fbe authored by Mikhail Khokhlov's avatar Mikhail Khokhlov Committed by Commit Bot

[tools/perf] Refactoring: parallel test result processing

Before this CL Results Processor did its processing in several stages:
aggregating traces for all tests in parallel, then computing metrics for
all tests in parallel, and so on. Now we move the parallelization to
the upper level, so that all processing for a particular test is done
inside one thread. This allows us to:
1) Make processing of tests independent, so that errors in one of them
do not affect others.
2) Mark the tests with failures 'FAIL' in the final results.
3) Add test-specific diagnostics to histograms.

Also we add support for the new intermediate results format (where there
are no benchmarkRun messages; all metadata is contained in testResults).

Bug: 981349, 1015192
Change-Id: I75d036a3ded439e092ee7b892a26bc26f3600520
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1869212
Commit-Queue: Mikhail Khokhlov <khokhlov@google.com>
Reviewed-by: default avatarJuan Antonio Navarro Pérez <perezju@chromium.org>
Cr-Commit-Position: refs/heads/master@{#708235}
parent 6f2e929e
......@@ -7,8 +7,6 @@ import logging
import os
import time
from core.results_processor import util
from tracing.metrics import metric_runner
......@@ -24,7 +22,7 @@ HISTOGRAM_DICTS_KEY = 'histogram_dicts'
HISTOGRAM_DICTS_FILE = 'histogram_dicts.json'
def _PoolWorker(test_result):
def _RunMetric(test_result):
metrics = [tag['value'] for tag in test_result['tags']
if tag['key'] == 'tbmv2']
html_trace = test_result['outputArtifacts'][HTML_TRACE_NAME]
......@@ -56,7 +54,7 @@ def _PoolWorker(test_result):
return mre_result.pairs.get('histograms', [])
def ComputeTBMv2Metrics(intermediate_results):
def ComputeTBMv2Metrics(test_result):
"""Compute metrics on aggregated traces in parallel.
For each test run that has an aggregate trace and some TBMv2 metrics listed
......@@ -64,25 +62,22 @@ def ComputeTBMv2Metrics(intermediate_results):
histograms. Note: the order of histograms in the results may be different
from the order of tests in intermediate_results.
"""
histogram_dicts = []
work_list = []
for test_result in intermediate_results['testResults']:
artifacts = test_result.get('outputArtifacts', {})
# TODO(crbug.com/981349): If metrics have already been computed in
# Telemetry, we read it from the file. Remove this branch after Telemetry
# does not compute metrics anymore.
if HISTOGRAM_DICTS_FILE in artifacts:
with open(artifacts[HISTOGRAM_DICTS_FILE]['filePath']) as f:
histogram_dicts += json.load(f)
test_result['_histograms'].ImportDicts(json.load(f))
del artifacts[HISTOGRAM_DICTS_FILE]
continue
return
if test_result['status'] == 'SKIP':
continue
return
if (HTML_TRACE_NAME not in artifacts or
not any(tag['key'] == 'tbmv2' for tag in test_result.get('tags', []))):
continue
return
trace_size_in_mib = (os.path.getsize(artifacts[HTML_TRACE_NAME]['filePath'])
/ (2 ** 20))
......@@ -93,11 +88,6 @@ def ComputeTBMv2Metrics(intermediate_results):
test_result['status'] = 'FAIL'
logging.error('%s: Trace size is too big: %s MiB',
test_result['testPath'], trace_size_in_mib)
continue
work_list.append(test_result)
for dicts in util.ApplyInParallel(_PoolWorker, work_list):
histogram_dicts += dicts
return
return histogram_dicts
test_result['_histograms'].ImportDicts(_RunMetric(test_result))
......@@ -11,6 +11,7 @@ from tracing.mre import failure
from tracing.mre import job
from tracing.mre import mre_result
from tracing.value import histogram
from tracing.value import histogram_set
import mock
......@@ -21,22 +22,14 @@ GETSIZE_METHOD = 'os.path.getsize'
class ComputeMetricsTest(unittest.TestCase):
def testComputeTBMv2Metrics(self):
in_results = testing.IntermediateResults([
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story1',
output_artifacts={
compute_metrics.HTML_TRACE_NAME:
testing.Artifact('/trace1.html', 'gs://trace1.html')},
tags=['tbmv2:metric1'],
),
testing.TestResult(
'benchmark/story2',
output_artifacts={
compute_metrics.HTML_TRACE_NAME:
testing.Artifact('/trace2.html', 'gs://trace2.html')},
tags=['tbmv2:metric2'],
),
])
)
test_result['_histograms'] = histogram_set.HistogramSet()
test_dict = histogram.Histogram('a', 'unitless').AsDict()
metrics_result = mre_result.MreResult()
......@@ -46,42 +39,41 @@ class ComputeMetricsTest(unittest.TestCase):
with mock.patch(RUN_METRICS_METHOD) as run_metrics_mock:
getsize_mock.return_value = 1000
run_metrics_mock.return_value = metrics_result
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(in_results)
compute_metrics.ComputeTBMv2Metrics(test_result)
self.assertEqual(histogram_dicts, [test_dict, test_dict])
self.assertEqual(in_results['testResults'][0]['status'], 'PASS')
self.assertEqual(in_results['testResults'][1]['status'], 'PASS')
histogram_dicts = test_result['_histograms'].AsDicts()
self.assertEqual(histogram_dicts, [test_dict])
self.assertEqual(test_result['status'], 'PASS')
def testComputeTBMv2MetricsTraceTooBig(self):
in_results = testing.IntermediateResults([
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story1',
output_artifacts={
compute_metrics.HTML_TRACE_NAME:
testing.Artifact('/trace1.html', 'gs://trace1.html')},
tags=['tbmv2:metric1'],
),
])
)
test_result['_histograms'] = histogram_set.HistogramSet()
with mock.patch(GETSIZE_METHOD) as getsize_mock:
with mock.patch(RUN_METRICS_METHOD) as run_metrics_mock:
getsize_mock.return_value = 1e9
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(in_results)
compute_metrics.ComputeTBMv2Metrics(test_result)
self.assertEqual(run_metrics_mock.call_count, 0)
histogram_dicts = test_result['_histograms'].AsDicts()
self.assertEqual(histogram_dicts, [])
self.assertEqual(in_results['testResults'][0]['status'], 'FAIL')
self.assertEqual(test_result['status'], 'FAIL')
def testComputeTBMv2MetricsFailure(self):
in_results = testing.IntermediateResults([
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story1',
output_artifacts={
compute_metrics.HTML_TRACE_NAME:
testing.Artifact('/trace1.html', 'gs://trace1.html')},
tags=['tbmv2:metric1'],
),
])
)
test_result['_histograms'] = histogram_set.HistogramSet()
metrics_result = mre_result.MreResult()
metrics_result.AddFailure(failure.Failure(job.Job(0), 0, 0, 0, 0, 0))
......@@ -90,26 +82,27 @@ class ComputeMetricsTest(unittest.TestCase):
with mock.patch(RUN_METRICS_METHOD) as run_metrics_mock:
getsize_mock.return_value = 100
run_metrics_mock.return_value = metrics_result
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(in_results)
compute_metrics.ComputeTBMv2Metrics(test_result)
histogram_dicts = test_result['_histograms'].AsDicts()
self.assertEqual(histogram_dicts, [])
self.assertEqual(in_results['testResults'][0]['status'], 'FAIL')
self.assertEqual(test_result['status'], 'FAIL')
def testComputeTBMv2MetricsSkipped(self):
in_results = testing.IntermediateResults([
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story1',
output_artifacts={
compute_metrics.HTML_TRACE_NAME:
testing.Artifact('/trace1.html', 'gs://trace1.html')},
tags=['tbmv2:metric1'],
status='SKIP',
),
])
)
test_result['_histograms'] = histogram_set.HistogramSet()
with mock.patch(RUN_METRICS_METHOD) as run_metrics_mock:
histogram_dicts = compute_metrics.ComputeTBMv2Metrics(in_results)
compute_metrics.ComputeTBMv2Metrics(test_result)
self.assertEqual(run_metrics_mock.call_count, 0)
histogram_dicts = test_result['_histograms'].AsDicts()
self.assertEqual(histogram_dicts, [])
self.assertEqual(in_results['testResults'][0]['status'], 'SKIP')
self.assertEqual(test_result['status'], 'SKIP')
......@@ -9,6 +9,7 @@ https://chromium.googlesource.com/chromium/src/+/master/docs/testing/json_test_r
"""
import collections
import datetime
import json
import os
import urllib
......@@ -19,18 +20,18 @@ from core.results_processor import util
OUTPUT_FILENAME = 'test-results.json'
def ProcessIntermediateResults(intermediate_results, options):
def ProcessIntermediateResults(test_results, options):
"""Process intermediate results and write output in output_dir."""
results = Convert(intermediate_results, options.output_dir)
results = Convert(test_results, options.output_dir)
with open(os.path.join(options.output_dir, OUTPUT_FILENAME), 'w') as f:
json.dump(results, f, sort_keys=True, indent=4, separators=(',', ': '))
def Convert(in_results, base_dir):
def Convert(test_results, base_dir):
"""Convert intermediate results to the JSON Test Results Format.
Args:
in_results: The parsed intermediate results.
test_results: The parsed intermediate results.
base_dir: A string with the path to a base directory; artifact file paths
will be written relative to this.
......@@ -40,7 +41,7 @@ def Convert(in_results, base_dir):
results = {'tests': {}}
status_counter = collections.Counter()
for result in in_results['testResults']:
for result in test_results:
benchmark_name, story_name = result['testPath'].split('/')
story_name = urllib.unquote(story_name)
actual_status = result['status']
......@@ -77,10 +78,17 @@ def Convert(in_results, base_dir):
if test['shard'] is None:
del test['shard']
benchmark_run = in_results['benchmarkRun']
# Test results are written in order of execution, so the first test start
# time is approximately the start time of the whole suite.
test_suite_start_time = (test_results[0]['startTime'] if test_results
else datetime.datetime.utcnow().isoformat() + 'Z')
# If Telemetry stops with a unhandleable error, then remaining stories
# are marked as unexpectedly skipped.
interrupted = any(t['status'] == 'SKIP' and not t['isExpected']
for t in test_results)
results.update(
seconds_since_epoch=util.IsoTimestampToEpoch(benchmark_run['startTime']),
interrupted=benchmark_run['interrupted'],
seconds_since_epoch=util.IsoTimestampToEpoch(test_suite_start_time),
interrupted=interrupted,
num_failures_by_type=dict(status_counter),
path_delimiter='/',
version=3,
......
......@@ -13,13 +13,11 @@ class Json3OutputTest(unittest.TestCase):
def setUp(self):
self.base_dir = 'base_dir'
def Convert(self, test_results, **kwargs):
base_dir = kwargs.pop('base_dir', self.base_dir)
original_results = testing.IntermediateResults(test_results, **kwargs)
intermediate_results = copy.deepcopy(original_results)
results = json3_output.Convert(intermediate_results, base_dir)
def Convert(self, test_results):
test_results_copy = copy.deepcopy(test_results)
results = json3_output.Convert(test_results_copy, self.base_dir)
# Convert should not modify the original intermediate results.
self.assertEqual(intermediate_results, original_results)
self.assertEqual(test_results_copy, test_results)
return results
def FindTestResult(self, results, benchmark, story):
......@@ -29,15 +27,15 @@ class Json3OutputTest(unittest.TestCase):
node = node[key]
return node
def testEmptyResults(self):
results = self.Convert(
[], start_time='2009-02-13T23:31:30.987000Z', interrupted=False)
def testStartTime(self):
results = self.Convert([
testing.TestResult('benchmark/story',
start_time='2009-02-13T23:31:30.987000Z')
])
self.assertFalse(results['interrupted'])
self.assertEqual(results['num_failures_by_type'], {})
self.assertEqual(results['path_delimiter'], '/')
self.assertEqual(results['seconds_since_epoch'], 1234567890.987)
self.assertEqual(results['tests'], {})
self.assertEqual(results['version'], 3)
def testSingleTestCase(self):
......
......@@ -4,6 +4,7 @@
"""Unit tests for results_processor methods."""
import datetime
import os
import unittest
......@@ -12,101 +13,65 @@ import mock
from core.results_processor import processor
from core.results_processor import testing
from tracing.value import histogram
from tracing.value.diagnostics import generic_set
from tracing.value.diagnostics import date_range
from tracing.value import histogram_set
class ResultsProcessorUnitTests(unittest.TestCase):
def testAddDiagnosticsToHistograms(self):
histogram_dicts = [histogram.Histogram('a', 'unitless').AsDict()]
in_results = testing.IntermediateResults(
test_results=[],
diagnostics={
'benchmarks': ['benchmark'],
'osNames': ['linux'],
'documentationUrls': [['documentation', 'url']],
},
)
test_result = testing.TestResult('benchmark/story')
test_result['_histograms'] = histogram_set.HistogramSet()
test_result['_histograms'].CreateHistogram('a', 'unitless', [0])
start_ts = 1500000000
start_iso = datetime.datetime.utcfromtimestamp(start_ts).isoformat() + 'Z'
histograms_with_diagnostics = processor.AddDiagnosticsToHistograms(
histogram_dicts, in_results, results_label='label')
out_histograms = histogram_set.HistogramSet()
out_histograms.ImportDicts(histograms_with_diagnostics)
diag_values = [list(v) for v in out_histograms.shared_diagnostics]
self.assertEqual(len(diag_values), 4)
self.assertIn(['benchmark'], diag_values)
self.assertIn(['linux'], diag_values)
self.assertIn([['documentation', 'url']], diag_values)
self.assertIn(['label'], diag_values)
processor.AddDiagnosticsToHistograms(
test_result, test_suite_start=start_iso, results_label='label')
hist = test_result['_histograms'].GetFirstHistogram()
self.assertEqual(hist.diagnostics['labels'],
generic_set.GenericSet(['label']))
self.assertEqual(hist.diagnostics['benchmarkStart'],
date_range.DateRange(start_ts * 1e3))
def testUploadArtifacts(self):
in_results = testing.IntermediateResults(
test_results=[
testing.TestResult(
'benchmark/story',
output_artifacts={'log': testing.Artifact('/log.log')},
),
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story',
output_artifacts={
'logs': testing.Artifact('/log.log'),
'trace.html': testing.Artifact('/trace.html'),
'screenshot': testing.Artifact('/screenshot.png'),
},
),
],
)
with mock.patch('py_utils.cloud_storage.Insert') as cloud_patch:
cloud_patch.return_value = 'gs://url'
processor.UploadArtifacts(in_results, 'bucket', None)
processor.UploadArtifacts(test_result, 'bucket', 'run1')
cloud_patch.assert_has_calls([
mock.call('bucket', mock.ANY, '/log.log'),
mock.call('bucket', mock.ANY, '/trace.html'),
mock.call('bucket', mock.ANY, '/screenshot.png'),
mock.call('bucket', 'run1/benchmark/story/logs', '/log.log'),
mock.call('bucket', 'run1/benchmark/story/trace.html', '/trace.html'),
mock.call('bucket', 'run1/benchmark/story/screenshot',
'/screenshot.png'),
],
any_order=True,
)
for result in in_results['testResults']:
for artifact in result['outputArtifacts'].itervalues():
for artifact in test_result['outputArtifacts'].itervalues():
self.assertEqual(artifact['remoteUrl'], 'gs://url')
def testUploadArtifacts_CheckRemoteUrl(self):
in_results = testing.IntermediateResults(
test_results=[
testing.TestResult(
'benchmark/story',
output_artifacts={
'trace.html': testing.Artifact('/trace.html')
},
),
],
start_time='2019-10-01T12:00:00.123456Z',
)
with mock.patch('py_utils.cloud_storage.Insert') as cloud_patch:
def testRunIdentifier(self):
with mock.patch('random.randint') as randint_patch:
randint_patch.return_value = 54321
processor.UploadArtifacts(in_results, 'bucket', 'src@abc + 123')
cloud_patch.assert_called_once_with(
'bucket',
'src_abc_123_20191001T120000_54321/benchmark/story/trace.html',
'/trace.html'
)
run_identifier = processor.RunIdentifier(
results_label='src@abc + 123',
test_suite_start='2019-10-01T12:00:00.123456Z')
self.assertEqual(run_identifier, 'src_abc_123_20191001T120000_54321')
def testAggregateTraces(self):
in_results = testing.IntermediateResults(
test_results=[
testing.TestResult(
'benchmark/story1',
output_artifacts={
'trace/1.json': testing.Artifact(
os.path.join('test_run', 'story1', 'trace', '1.json')),
},
),
testing.TestResult(
test_result = testing.TestResult(
'benchmark/story2',
output_artifacts={
'trace/1.json': testing.Artifact(
......@@ -114,37 +79,27 @@ class ResultsProcessorUnitTests(unittest.TestCase):
'trace/2.json': testing.Artifact(
os.path.join('test_run', 'story2', 'trace', '2.json')),
},
),
],
)
with mock.patch('tracing.trace_data.trace_data.SerializeAsHtml') as patch:
processor.AggregateTraces(in_results)
call_list = [list(call[0]) for call in patch.call_args_list]
self.assertEqual(len(call_list), 2)
for call in call_list:
call[0] = set(call[0])
self.assertIn(
[
set([os.path.join('test_run', 'story1', 'trace', '1.json')]),
os.path.join('test_run', 'story1', 'trace', 'trace.html'),
],
call_list
)
self.assertIn(
[
serialize_method = 'tracing.trace_data.trace_data.SerializeAsHtml'
with mock.patch(serialize_method) as mock_serialize:
processor.AggregateTraces(test_result)
self.assertEqual(mock_serialize.call_count, 1)
trace_files, file_path = mock_serialize.call_args[0][:2]
self.assertEqual(
set(trace_files),
set([
os.path.join('test_run', 'story2', 'trace', '1.json'),
os.path.join('test_run', 'story2', 'trace', '2.json'),
]),
)
self.assertEqual(
file_path,
os.path.join('test_run', 'story2', 'trace', 'trace.html'),
],
call_list
)
for result in in_results['testResults']:
artifacts = result['outputArtifacts']
artifacts = test_result['outputArtifacts']
self.assertEqual(len(artifacts), 1)
self.assertEqual(artifacts.keys()[0], 'trace.html')
......
......@@ -7,33 +7,6 @@
import json
_BENCHMARK_START_KEYS = set(['startTime'])
def IntermediateResults(test_results, start_time='2015-10-21T07:28:00.000Z',
finalized=True, interrupted=False, diagnostics=None):
"""Build a dict of 'parsed' intermediate results.
Args:
test_results: A sequence of testResult dicts.
start_time: An optional UTC timestamp recording when a benchmark started
running.
finalized: An optional bool indicating whether the benchmark run finalized.
Defaults to True.
interrupted: An optional bool indicating whether the benchmark run was
interrupted. Defaults to False.
"""
return {
'benchmarkRun': {
'startTime': start_time,
'finalized': finalized,
'interrupted': interrupted,
'diagnostics': diagnostics or {},
},
'testResults': list(test_results)
}
def TestResult(test_path, status='PASS', is_expected=None,
start_time='2015-10-21T07:28:00.000Z', run_duration='1.00s',
output_artifacts=None, tags=None):
......@@ -97,29 +70,16 @@ def SerializeIntermediateResults(in_results, filepath):
"""Serialize intermediate results to a filepath.
Args:
in_results: A dict with intermediate results, e.g. as produced by
IntermediateResults or parsed from an intermediate results file.
filpath: A file path where to serialize the intermediate results.
in_results: A list of test results.
filepath: A file path where to serialize the intermediate results.
"""
# Split benchmarkRun into fields recorded at startup and when finishing.
benchmark_start = {}
benchmark_finish = {}
for key, value in in_results['benchmarkRun'].items():
d = benchmark_start if key in _BENCHMARK_START_KEYS else benchmark_finish
d[key] = value
# Serialize individual records as a sequence of json lines.
with open(filepath, 'w') as fp:
_SerializeRecord({'benchmarkRun': benchmark_start}, fp)
for test_result in in_results['testResults']:
_SerializeRecord({'testResult': test_result}, fp)
_SerializeRecord({'benchmarkRun': benchmark_finish}, fp)
for test_result in in_results:
json.dump({'testResult': test_result}, fp,
sort_keys=True, separators=(',', ':'))
fp.write('\n')
def _SplitTag(tag):
key, value = tag.split(':', 1)
return {'key': key, 'value': value}
def _SerializeRecord(record, fp):
fp.write(json.dumps(record, sort_keys=True, separators=(',', ':')) + '\n')
......@@ -9,16 +9,13 @@ import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
def ApplyInParallel(function, work_list):
def ApplyInParallel(function, work_list, on_failure=None):
"""Apply a function to all values in work_list in parallel.
Args:
function: A function with one argument.
work_list: Any iterable with arguments for the function.
Returns:
A generator over results. The order of results might not match the
order of the arguments in the work_list.
on_failure: A function to run in case of a failure.
"""
if not work_list:
return
......@@ -35,17 +32,17 @@ def ApplyInParallel(function, work_list):
def function_with_try(arg):
try:
return function(arg)
function(arg)
except Exception: # pylint: disable=broad-except
# logging exception here is the only way to get a stack trace since
# multiprocessing's pool implementation does not save that data. See
# crbug.com/953365.
logging.exception('Exception while running %s' % function.__name__)
raise
if on_failure:
on_failure(arg)
try:
for result in pool.imap_unordered(function_with_try, work_list):
yield result
pool.imap_unordered(function_with_try, work_list)
pool.close()
pool.join()
finally:
......
......@@ -9,15 +9,16 @@ from core.results_processor import util
class UtilTests(unittest.TestCase):
def testApplyInParallel(self):
work_list = [1, 2, 3]
fun = lambda x: x * x
result = set(util.ApplyInParallel(fun, work_list))
self.assertEqual(result, set([1, 4, 9]))
work_list = [[1], [2], [3]]
def fun(x):
x.extend(x)
util.ApplyInParallel(fun, work_list)
self.assertEqual(work_list, [[1, 1], [2, 2], [3, 3]])
def testApplyInParallelExceptionRaised(self):
work_list = [1, 2, 3]
def testApplyInParallelOnFailure(self):
work_list = [[1], [2], [3]]
def fun(x):
if x == 3:
if x == [3]:
raise RuntimeError()
with self.assertRaises(RuntimeError):
list(util.ApplyInParallel(fun, work_list))
util.ApplyInParallel(fun, work_list, on_failure=lambda x: x.pop())
self.assertEqual(work_list, [[1], [2], []])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment