Commit 681ba107 authored by Emily Hanley's avatar Emily Hanley Committed by Commit Bot

Parallelizing perf dashboard uploads

This change also update the oauth token to generating
ne per run to one per benchmark

Bug:713357,854162
Change-Id: If06e71c53fe8083f82307584a6e92104e33b2f65


CQ_INCLUDE_TRYBOTS=master.tryserver.chromium.perf:obbs_fyi

Change-Id: If06e71c53fe8083f82307584a6e92104e33b2f65
Reviewed-on: https://chromium-review.googlesource.com/1114690
Commit-Queue: Ned Nguyen <nednguyen@google.com>
Reviewed-by: default avatarNed Nguyen <nednguyen@google.com>
Cr-Commit-Position: refs/heads/master@{#571212}
parent d15b5ec0
...@@ -13,13 +13,13 @@ import tempfile ...@@ -13,13 +13,13 @@ import tempfile
@contextlib.contextmanager @contextlib.contextmanager
def with_access_token(service_account_json): def with_access_token(service_account_json, append):
"""Yields an access token for the service account. """Yields an access token for the service account.
Args: Args:
service_account_json: The path to the service account JSON file. service_account_json: The path to the service account JSON file.
""" """
fd, path = tempfile.mkstemp(suffix='.json', prefix='tok') fd, path = tempfile.mkstemp(suffix='.json', prefix=append)
try: try:
args = ['luci-auth', 'token'] args = ['luci-auth', 'token']
if service_account_json: if service_account_json:
......
...@@ -19,6 +19,7 @@ import sys ...@@ -19,6 +19,7 @@ import sys
import traceback import traceback
import urllib import urllib
import urllib2 import urllib2
import uuid
import zlib import zlib
from core import path_util from core import path_util
...@@ -87,9 +88,10 @@ def SendResults(data, url, tmp_dir, ...@@ -87,9 +88,10 @@ def SendResults(data, url, tmp_dir,
def _GetCacheFileName(tmp_dir): def _GetCacheFileName(tmp_dir):
"""Gets the cache filename, creating the file if it does not exist.""" """Gets the cache filename, creating the file if it does not exist."""
cache_dir = os.path.join(os.path.abspath(tmp_dir), CACHE_DIR) cache_dir = os.path.join(os.path.abspath(tmp_dir), CACHE_DIR, str(uuid.uuid4()))
if not os.path.exists(cache_dir): if not os.path.exists(cache_dir):
os.makedirs(cache_dir) os.makedirs(cache_dir)
# Since this is multi-processed, add a unique identifier to the cache dir
cache_filename = os.path.join(cache_dir, CACHE_FILENAME) cache_filename = os.path.join(cache_dir, CACHE_FILENAME)
if not os.path.exists(cache_filename): if not os.path.exists(cache_filename):
# Create the file. # Create the file.
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
import argparse import argparse
import json import json
import multiprocessing as mp
import os import os
import shutil import shutil
import sys import sys
...@@ -64,7 +65,7 @@ def _GetMachineGroup(build_properties): ...@@ -64,7 +65,7 @@ def _GetMachineGroup(build_properties):
def _upload_perf_results(json_to_upload, name, configuration_name, def _upload_perf_results(json_to_upload, name, configuration_name,
build_properties, oauth_file, tmp_dir, output_json_file): build_properties, oauth_file, tmp_dir, output_json_file):
"""Upload the contents of result JSON(s) to the perf dashboard.""" """Upload the contents of result JSON(s) to the perf dashboard."""
args = [ args= [
'--tmp-dir', tmp_dir, '--tmp-dir', tmp_dir,
'--buildername', build_properties['buildername'], '--buildername', build_properties['buildername'],
'--buildnumber', build_properties['buildnumber'], '--buildnumber', build_properties['buildnumber'],
...@@ -87,7 +88,6 @@ def _upload_perf_results(json_to_upload, name, configuration_name, ...@@ -87,7 +88,6 @@ def _upload_perf_results(json_to_upload, name, configuration_name,
return upload_results_to_perf_dashboard.main(args) return upload_results_to_perf_dashboard.main(args)
def _is_histogram(json_file): def _is_histogram(json_file):
with open(json_file) as f: with open(json_file) as f:
data = json.load(f) data = json.load(f)
...@@ -322,6 +322,49 @@ def _merge_perf_results(benchmark_name, results_filename, directories): ...@@ -322,6 +322,49 @@ def _merge_perf_results(benchmark_name, results_filename, directories):
begin_time, end_time) begin_time, end_time)
def _upload_individual(
benchmark_name, directories, configuration_name,
build_properties, output_json_file, service_account_file):
tmpfile_dir = tempfile.mkdtemp('resultscache')
try:
upload_begin_time = time.time()
# There are potentially multiple directores with results, re-write and
# merge them if necessary
results_filename = None
if len(directories) > 1:
merge_perf_dir = os.path.join(
os.path.abspath(tmpfile_dir), benchmark_name)
if not os.path.exists(merge_perf_dir):
os.makedirs(merge_perf_dir)
results_filename = os.path.join(
merge_perf_dir, 'merged_perf_results.json')
_merge_perf_results(benchmark_name, results_filename, directories)
else:
# It was only written to one shard, use that shards data
results_filename = join(directories[0], 'perf_results.json')
print 'Uploading perf results from %s benchmark' % benchmark_name
# We generate an oauth token for every benchmark upload in the event
# the token could time out, see crbug.com/854162
with oauth_api.with_access_token(
service_account_file, ("%s_tok" % benchmark_name)) as oauth_file:
with open(output_json_file, 'w') as oj:
upload_fail = _upload_perf_results(
results_filename,
benchmark_name, configuration_name, build_properties,
oauth_file, tmpfile_dir, oj)
upload_end_time = time.time()
print_duration(('%s upload time' % (benchmark_name)),
upload_begin_time, upload_end_time)
return (benchmark_name, upload_fail)
finally:
shutil.rmtree(tmpfile_dir)
def _upload_individual_benchmark(params):
return _upload_individual(*params)
def _handle_perf_results( def _handle_perf_results(
benchmark_enabled_map, benchmark_directory_map, configuration_name, benchmark_enabled_map, benchmark_directory_map, configuration_name,
build_properties, service_account_file, extra_links): build_properties, service_account_file, extra_links):
...@@ -335,45 +378,52 @@ def _handle_perf_results( ...@@ -335,45 +378,52 @@ def _handle_perf_results(
0 if this upload to perf dashboard succesfully, 1 otherwise. 0 if this upload to perf dashboard succesfully, 1 otherwise.
""" """
begin_time = time.time() begin_time = time.time()
tmpfile_dir = tempfile.mkdtemp('resultscache') tmpfile_dir = tempfile.mkdtemp('outputresults')
try: try:
# Upload all eligible benchmarks to the perf dashboard # Upload all eligible benchmarks to the perf dashboard
results_dict = {}
invocations = []
for benchmark_name, directories in benchmark_directory_map.iteritems():
if not benchmark_enabled_map[benchmark_name]:
continue
# Create a place to write the perf results that you will write out to
# logdog.
output_json_file = os.path.join(
os.path.abspath(tmpfile_dir), (str(uuid.uuid4()) + benchmark_name))
results_dict[benchmark_name] = output_json_file
invocations.append((
benchmark_name, directories, configuration_name,
build_properties, output_json_file, service_account_file))
# Kick off the uploads in mutliple processes
cpus = mp.cpu_count()
pool = mp.Pool(cpus)
result_iterator = pool.map(_upload_individual_benchmark,
invocations)
# Keep a mapping of benchmarks to their upload results
benchmark_upload_result_map = {}
for result in result_iterator:
benchmark_upload_result_map[result[0]] = bool(result[1])
logdog_dict = {} logdog_dict = {}
upload_failure = False
logdog_stream = None logdog_stream = None
logdog_label = 'Results Dashboard' logdog_label = 'Results Dashboard'
upload_failure = False for benchmark_name, output_file in results_dict.iteritems():
with oauth_api.with_access_token(service_account_file) as oauth_file: print "Benchmark: %s, file: %s" % (benchmark_name, output_file)
for benchmark_name, directories in benchmark_directory_map.iteritems(): failure = benchmark_upload_result_map[benchmark_name]
if not benchmark_enabled_map[benchmark_name]: upload_failure = upload_failure or failure
continue is_reference = '.reference' in benchmark_name
upload_begin_time = time.time() _write_perf_data_to_logfile(
# There are potentially multiple directores with results, re-write and benchmark_name, output_file,
# merge them if necessary configuration_name, build_properties, logdog_dict,
results_filename = None is_reference, failure)
if len(directories) > 1:
merge_perf_dir = os.path.join(
os.path.abspath(tmpfile_dir), benchmark_name)
if not os.path.exists(merge_perf_dir):
os.makedirs(merge_perf_dir)
results_filename = os.path.join(
merge_perf_dir, 'merged_perf_results.json')
_merge_perf_results(benchmark_name, results_filename, directories)
else:
# It was only written to one shard, use that shards data
results_filename = join(directories[0], 'perf_results.json')
print 'Uploading perf results from %s benchmark' % benchmark_name
upload_fail = _upload_and_write_perf_data_to_logfile(
benchmark_name, results_filename, configuration_name,
build_properties, oauth_file, tmpfile_dir, logdog_dict,
('.reference' in benchmark_name))
upload_failure = upload_failure or upload_fail
upload_end_time = time.time()
print_duration(('%s upload time' % (benchmark_name)),
upload_begin_time, upload_end_time)
logdog_file_name = _generate_unique_logdog_filename('Results_Dashboard_') logdog_file_name = _generate_unique_logdog_filename('Results_Dashboard_')
logdog_stream = logdog_helper.text(logdog_file_name, logdog_stream = logdog_helper.text(logdog_file_name,
json.dumps(logdog_dict, sort_keys=True, json.dumps(dict(logdog_dict), sort_keys=True,
indent=4, separators=(',', ': ')), indent=4, separators=(',', ': ')),
content_type=JSON_CONTENT_TYPE) content_type=JSON_CONTENT_TYPE)
if upload_failure: if upload_failure:
...@@ -388,18 +438,15 @@ def _handle_perf_results( ...@@ -388,18 +438,15 @@ def _handle_perf_results(
shutil.rmtree(tmpfile_dir) shutil.rmtree(tmpfile_dir)
def _upload_and_write_perf_data_to_logfile(benchmark_name, results_file, def _write_perf_data_to_logfile(benchmark_name, output_file,
configuration_name, build_properties, oauth_file, configuration_name, build_properties,
tmpfile_dir, logdog_dict, is_ref): logdog_dict, is_ref, upload_failure):
upload_failure = False
# logdog file to write perf results to # logdog file to write perf results to
output_json_file = logdog_helper.open_text(benchmark_name) output_json_file = logdog_helper.open_text(benchmark_name)
with open(output_file) as f:
# upload results and write perf results to logdog file results = json.load(f)
upload_failure = _upload_perf_results( json.dump(results, output_json_file,
results_file, indent=4, separators=(',', ': '))
benchmark_name, configuration_name, build_properties,
oauth_file, tmpfile_dir, output_json_file)
output_json_file.close() output_json_file.close()
...@@ -427,7 +474,6 @@ def _upload_and_write_perf_data_to_logfile(benchmark_name, results_file, ...@@ -427,7 +474,6 @@ def _upload_and_write_perf_data_to_logfile(benchmark_name, results_file,
logdog_dict[base_benchmark_name]['perf_results'] = \ logdog_dict[base_benchmark_name]['perf_results'] = \
output_json_file.get_viewer_url() output_json_file.get_viewer_url()
return upload_failure
def print_duration(step, start, end): def print_duration(step, start, end):
print 'Duration of %s: %d seconds' % (step, end-start) print 'Duration of %s: %d seconds' % (step, end-start)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment