Commit 15caa536 authored by Mikhail Khokhlov's avatar Mikhail Khokhlov Committed by Commit Bot

[tools/perf] Upload artifacts in parallel

This CL speeds up uploading test artifacts to the cloud by doing it in
a thread pool. It also fixes a bug where run_indentifier would change
during the script run.

Bug: 981349
Change-Id: I65fea156d73fb568f8ecb3e05d330971ab55be29
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1837630
Commit-Queue: Mikhail Khokhlov <khokhlov@google.com>
Reviewed-by: default avatarJuan Antonio Navarro Pérez <perezju@chromium.org>
Cr-Commit-Position: refs/heads/master@{#702787}
parent b58ea26a
...@@ -4,11 +4,11 @@ ...@@ -4,11 +4,11 @@
import json import json
import logging import logging
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import os import os
import time import time
from core.results_processor import util
from tracing.metrics import metric_runner from tracing.metrics import metric_runner
...@@ -101,22 +101,7 @@ def ComputeTBMv2Metrics(intermediate_results): ...@@ -101,22 +101,7 @@ def ComputeTBMv2Metrics(intermediate_results):
if not work_list: if not work_list:
return histogram_dicts return histogram_dicts
try: for dicts in util.ApplyInParallel(_PoolWorker, work_list):
# Note that this is speculatively halved as an attempt to fix histogram_dicts += dicts
# crbug.com/953365.
cpu_count = multiprocessing.cpu_count() / 2
except NotImplementedError:
# Some platforms can raise a NotImplementedError from cpu_count()
logging.warning('cpu_count() not implemented.')
cpu_count = 4
pool = ThreadPool(min(cpu_count, len(work_list)))
try:
for dicts in pool.imap_unordered(_PoolWorker, work_list):
histogram_dicts += dicts
pool.close()
pool.join()
finally:
pool.terminate()
return histogram_dicts return histogram_dicts
...@@ -18,6 +18,7 @@ from py_utils import cloud_storage ...@@ -18,6 +18,7 @@ from py_utils import cloud_storage
from core.results_processor import command_line from core.results_processor import command_line
from core.results_processor import compute_metrics from core.results_processor import compute_metrics
from core.results_processor import formatters from core.results_processor import formatters
from core.results_processor import util
from tracing.value.diagnostics import generic_set from tracing.value.diagnostics import generic_set
from tracing.value.diagnostics import reserved_infos from tracing.value.diagnostics import reserved_infos
...@@ -93,10 +94,8 @@ def _AggregateTraces(intermediate_results): ...@@ -93,10 +94,8 @@ def _AggregateTraces(intermediate_results):
del artifacts[trace] del artifacts[trace]
def _RemoteName(results_label, start_time, test_path, artifact_name): def _RunIdentifier(results_label, start_time):
"""Construct a name for a given artifact, under which it will be """Construct an identifier for the current script run"""
stored in the cloud.
"""
if results_label: if results_label:
identifier_parts = [re.sub(r'\W+', '_', results_label)] identifier_parts = [re.sub(r'\W+', '_', results_label)]
else: else:
...@@ -105,8 +104,7 @@ def _RemoteName(results_label, start_time, test_path, artifact_name): ...@@ -105,8 +104,7 @@ def _RemoteName(results_label, start_time, test_path, artifact_name):
# The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'. # The first 19 chars of the string match 'YYYY-MM-DDTHH:MM:SS'.
identifier_parts.append(re.sub(r'\W+', '', start_time[:19])) identifier_parts.append(re.sub(r'\W+', '', start_time[:19]))
identifier_parts.append(str(random.randint(1, 1e5))) identifier_parts.append(str(random.randint(1, 1e5)))
run_identifier = '_'.join(identifier_parts) return '_'.join(identifier_parts)
return '/'.join([run_identifier, test_path, artifact_name])
def UploadArtifacts(intermediate_results, upload_bucket, results_label): def UploadArtifacts(intermediate_results, upload_bucket, results_label):
...@@ -118,7 +116,10 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label): ...@@ -118,7 +116,10 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label):
if upload_bucket is None: if upload_bucket is None:
return return
start_time = intermediate_results['benchmarkRun']['startTime'] run_identifier = _RunIdentifier(
results_label, intermediate_results['benchmarkRun']['startTime'])
work_list = []
for result in intermediate_results['testResults']: for result in intermediate_results['testResults']:
artifacts = result.get('artifacts', {}) artifacts = result.get('artifacts', {})
for name, artifact in artifacts.iteritems(): for name, artifact in artifacts.iteritems():
...@@ -128,13 +129,25 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label): ...@@ -128,13 +129,25 @@ def UploadArtifacts(intermediate_results, upload_bucket, results_label):
# save histograms as an artifact anymore. # save histograms as an artifact anymore.
if name == compute_metrics.HISTOGRAM_DICTS_FILE: if name == compute_metrics.HISTOGRAM_DICTS_FILE:
continue continue
artifact['remoteUrl'] = cloud_storage.Insert( remote_name = '/'.join([run_identifier, result['testPath'], name])
upload_bucket, work_list.append((artifact, remote_name))
_RemoteName(results_label, start_time, result['testPath'], name),
artifact['filePath'], if not work_list:
) return
logging.info('Uploaded %s of %s to %s\n' % (
name, result['testPath'], artifact['remoteUrl'])) def PoolUploader(work_item):
artifact, remote_name = work_item
artifact['remoteUrl'] = cloud_storage.Insert(
upload_bucket, remote_name, artifact['filePath'])
for _ in util.ApplyInParallel(PoolUploader, work_list):
pass
for result in intermediate_results['testResults']:
artifacts = result.get('artifacts', {})
for name, artifact in artifacts.iteritems():
logging.info('Uploaded %s of %s to %s', name, result['testPath'],
artifact['remoteUrl'])
def _ComputeMetrics(intermediate_results, results_label): def _ComputeMetrics(intermediate_results, results_label):
......
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
def ApplyInParallel(function, work_list):
"""Apply a function to all values in work_list in parallel.
Args:
function: A function with one argument.
work_list: Any iterable with arguments for the function.
Returns:
A generator over results. The order of results might not match the
order of the arguments in the work_list.
"""
try:
# Note that this is speculatively halved as an attempt to fix
# crbug.com/953365.
cpu_count = multiprocessing.cpu_count() / 2
except NotImplementedError:
# Some platforms can raise a NotImplementedError from cpu_count()
logging.warning('cpu_count() not implemented.')
cpu_count = 4
pool = ThreadPool(min(cpu_count, len(work_list)))
try:
for result in pool.imap_unordered(function, work_list):
yield result
pool.close()
pool.join()
finally:
pool.terminate()
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
from core.results_processor import util
class UtilTests(unittest.TestCase):
def testApplyInParallel(self):
work_list = [1, 2, 3]
fun = lambda x: x * x
result = set(util.ApplyInParallel(fun, work_list))
self.assertEqual(result, set([1, 4, 9]))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment