Commit fbbdb5fe authored by Andrew Grieve's avatar Andrew Grieve Committed by Commit Bot

Android: Use non-dummy multiprocessing in compile_resources.py

Will be more relevant if we use more arsc filters.
Even so, speeds up command on my machine: 10.3s -> 7.7s

Bug: 636448
Change-Id: I9b3db23782282e3e13c613b0b64ff1375fb1422d
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2186756
Commit-Queue: Andrew Grieve <agrieve@chromium.org>
Reviewed-by: default avatarSamuel Huang <huangs@chromium.org>
Cr-Commit-Position: refs/heads/master@{#766469}
parent 5dbbd539
......@@ -18,7 +18,6 @@ import contextlib
import filecmp
import hashlib
import logging
import multiprocessing.dummy
import os
import re
import shutil
......@@ -26,7 +25,6 @@ import subprocess
import sys
import tempfile
import textwrap
import time
import zipfile
from xml.etree import ElementTree
......@@ -34,9 +32,11 @@ from util import build_utils
from util import diff_utils
from util import manifest_utils
from util import md5_check
from util import parallel
from util import protoresources
from util import resource_utils
# Pngs that we shouldn't convert to webp. Please add rationale when updating.
_PNG_WEBP_EXCLUSION_PATTERN = re.compile('|'.join([
# Crashes on Galaxy S5 running L (https://crbug.com/807059).
......@@ -546,68 +546,64 @@ def _CreateKeepPredicate(resource_exclusion_regex,
build_utils.MatchesGlob(path, resource_exclusion_exceptions))
def _ConvertToWebP(webp_binary, png_paths, path_info, webp_cache_dir):
pool = multiprocessing.dummy.Pool(10)
def _ComputeSha1(path):
with open(path, 'rb') as f:
data = f.read()
return hashlib.sha1(data).hexdigest()
build_utils.MakeDirectory(webp_cache_dir)
cwebp_version = subprocess.check_output([webp_binary, '-version']).rstrip()
cwebp_arguments = ['-mt', '-quiet', '-m', '6', '-q', '100', '-lossless']
def _ConvertToWebPSingle(png_path, cwebp_binary, cwebp_version, webp_cache_dir):
sha1_hash = _ComputeSha1(png_path)
sha1_time = [0]
cwebp_time = [0]
cache_hits = [0]
# The set of arguments that will appear in the cache key.
quality_args = ['-m', '6', '-q', '100', '-lossless']
def cal_sha1(png_path):
start = time.time()
with open(png_path, 'rb') as f:
png_content = f.read()
webp_cache_path = os.path.join(
webp_cache_dir, '{}-{}-{}'.format(sha1_hash, cwebp_version,
''.join(quality_args)))
# No need to add .webp. Android can load images fine without them.
webp_path = os.path.splitext(png_path)[0]
sha1_hex = hashlib.sha1(png_content).hexdigest()
sha1_time[0] += time.time() - start
return sha1_hex
cache_hit = os.path.exists(webp_cache_path)
if cache_hit:
os.link(webp_cache_path, webp_path)
else:
# We place the generated webp image to webp_path, instead of in the
# webp_cache_dir to avoid concurrency issues.
args = [cwebp_binary, png_path, '-o', webp_path, '-quiet'] + quality_args
subprocess.check_call(args)
def get_converted_image(png_path):
sha1_hash = cal_sha1(png_path)
try:
os.link(webp_path, webp_cache_path)
except OSError:
# Because of concurrent run, a webp image may already exists in
# webp_cache_path.
pass
webp_cache_path = os.path.join(
webp_cache_dir, '{}-{}-{}'.format(sha1_hash, cwebp_version,
''.join(cwebp_arguments)))
# No need to add an extension, android can load images fine without them.
webp_path = os.path.splitext(png_path)[0]
os.remove(png_path)
original_dir = os.path.dirname(os.path.dirname(png_path))
rename_tuple = (os.path.relpath(png_path, original_dir),
os.path.relpath(webp_path, original_dir))
return rename_tuple, cache_hit
if os.path.exists(webp_cache_path):
cache_hits[0] += 1
os.link(webp_cache_path, webp_path)
else:
# We place the generated webp image to webp_path, instead of in the
# webp_cache_dir to avoid concurrency issues.
start = time.time()
args = [webp_binary, png_path] + cwebp_arguments + ['-o', webp_path]
subprocess.check_call(args)
cwebp_time[0] += time.time() - start
try:
os.link(webp_path, webp_cache_path)
except OSError:
# Because of concurrent run, a webp image may already exists in
# webp_cache_path.
pass
os.remove(png_path)
original_dir = os.path.dirname(os.path.dirname(png_path))
path_info.RegisterRename(
os.path.relpath(png_path, original_dir),
os.path.relpath(webp_path, original_dir))
png_paths = [f for f in png_paths if not _PNG_WEBP_EXCLUSION_PATTERN.match(f)]
try:
pool.map(get_converted_image, png_paths)
finally:
pool.close()
pool.join()
logging.debug('png->webp: cache: %d/%d sha1 time: %.1fms cwebp time: %.1fms',
cache_hits[0], len(png_paths), sha1_time[0], cwebp_time[0])
def _ConvertToWebP(cwebp_binary, png_paths, path_info, webp_cache_dir):
cwebp_version = subprocess.check_output([cwebp_binary, '-version']).rstrip()
shard_args = [(f, ) for f in png_paths
if not _PNG_WEBP_EXCLUSION_PATTERN.match(f)]
build_utils.MakeDirectory(webp_cache_dir)
results = parallel.BulkForkAndCall(_ConvertToWebPSingle,
shard_args,
cwebp_binary=cwebp_binary,
cwebp_version=cwebp_version,
webp_cache_dir=webp_cache_dir)
total_cache_hits = 0
for rename_tuple, cache_hit in results:
path_info.RegisterRename(*rename_tuple)
total_cache_hits += int(cache_hit)
logging.debug('png->webp cache: %d/%d', total_cache_hits, len(shard_args))
def _RemoveImageExtensions(directory, path_info):
......@@ -627,10 +623,9 @@ def _RemoveImageExtensions(directory, path_info):
os.path.relpath(path_no_extension, directory))
def _CompileSingleDep(args):
index, dep_path, aapt2_path, partials_dir, exclusion_rules = args
basename = os.path.basename(dep_path)
unique_name = '{}_{}'.format(index, basename)
def _CompileSingleDep(index, dep_subdir, keep_predicate, aapt2_path,
partials_dir):
unique_name = '{}_{}'.format(index, os.path.basename(dep_subdir))
partial_path = os.path.join(partials_dir, '{}.zip'.format(unique_name))
compile_command = [
......@@ -639,7 +634,7 @@ def _CompileSingleDep(args):
# TODO(wnwen): Turn this on once aapt2 forces 9-patch to be crunched.
# '--no-crunch',
'--dir',
dep_path,
dep_subdir,
'-o',
partial_path
]
......@@ -654,33 +649,16 @@ def _CompileSingleDep(args):
# Filtering these files is expensive, so only apply filters to the partials
# that have been explicitly targeted.
keep_predicate = _CreateValuesKeepPredicate(exclusion_rules, dep_path)
if keep_predicate:
logging.debug('Applying .arsc filtering to %s', dep_path)
logging.debug('Applying .arsc filtering to %s', dep_subdir)
protoresources.StripUnwantedResources(partial_path, keep_predicate)
return partial_path
def _CompileDeps(aapt2_path, dep_subdirs, temp_dir, exclusion_rules):
partials_dir = os.path.join(temp_dir, 'partials')
build_utils.MakeDirectory(partials_dir)
def iter_params():
for i, dep_path in enumerate(dep_subdirs):
yield i, dep_path, aapt2_path, partials_dir, exclusion_rules
pool = multiprocessing.dummy.Pool(10)
try:
return pool.map(_CompileSingleDep, iter_params())
finally:
pool.close()
pool.join()
def _CreateValuesKeepPredicate(exclusion_rules, dep_path):
def _CreateValuesKeepPredicate(exclusion_rules, dep_subdir):
patterns = [
x[1] for x in exclusion_rules
if build_utils.MatchesGlob(dep_path, [x[0]])
if build_utils.MatchesGlob(dep_subdir, [x[0]])
]
if not patterns:
return None
......@@ -689,6 +667,23 @@ def _CreateValuesKeepPredicate(exclusion_rules, dep_path):
return lambda x: not any(r.search(x) for r in regexes)
def _CompileDeps(aapt2_path, dep_subdirs, temp_dir, exclusion_rules):
partials_dir = os.path.join(temp_dir, 'partials')
build_utils.MakeDirectory(partials_dir)
job_params = [(i, dep_subdir,
_CreateValuesKeepPredicate(exclusion_rules, dep_subdir))
for i, dep_subdir in enumerate(dep_subdirs)]
# Filtering is slow, so ensure jobs with keep_predicate are started first.
job_params.sort(key=lambda x: not x[2])
return list(
parallel.BulkForkAndCall(_CompileSingleDep,
job_params,
aapt2_path=aapt2_path,
partials_dir=partials_dir))
def _CreateResourceInfoFile(path_info, info_path, dependencies_res_zips):
for zip_file in dependencies_res_zips:
zip_info_file_path = zip_file + '.info'
......
......@@ -55,5 +55,6 @@ util/build_utils.py
util/diff_utils.py
util/manifest_utils.py
util/md5_check.py
util/parallel.py
util/protoresources.py
util/resource_utils.py
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helpers related to multiprocessing.
Based on: //tools/binary_size/libsupersize/parallel.py
"""
import atexit
import logging
import multiprocessing
import os
import sys
import threading
import traceback
DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
if DISABLE_ASYNC:
logging.warning('Running in synchronous mode.')
_all_pools = None
_is_child_process = False
_silence_exceptions = False
# Used to pass parameters to forked processes without pickling.
_fork_params = None
_fork_kwargs = None
class _ImmediateResult(object):
def __init__(self, value):
self._value = value
def get(self):
return self._value
def wait(self):
pass
def ready(self):
return True
def successful(self):
return True
class _ExceptionWrapper(object):
"""Used to marshal exception messages back to main process."""
def __init__(self, msg, exception_type=None):
self.msg = msg
self.exception_type = exception_type
def MaybeThrow(self):
if self.exception_type:
raise getattr(__builtins__,
self.exception_type)('Originally caused by: ' + self.msg)
class _FuncWrapper(object):
"""Runs on the fork()'ed side to catch exceptions and spread *args."""
def __init__(self, func):
global _is_child_process
_is_child_process = True
self._func = func
def __call__(self, index, _=None):
try:
return self._func(*_fork_params[index], **_fork_kwargs)
except Exception as e:
# Only keep the exception type for builtin exception types or else risk
# further marshalling exceptions.
exception_type = None
if hasattr(__builtins__, type(e).__name__):
exception_type = type(e).__name__
# multiprocessing is supposed to catch and return exceptions automatically
# but it doesn't seem to work properly :(.
return _ExceptionWrapper(traceback.format_exc(), exception_type)
except: # pylint: disable=bare-except
return _ExceptionWrapper(traceback.format_exc())
class _WrappedResult(object):
"""Allows for host-side logic to be run after child process has terminated.
* Unregisters associated pool _all_pools.
* Raises exception caught by _FuncWrapper.
"""
def __init__(self, result, pool=None):
self._result = result
self._pool = pool
def get(self):
self.wait()
value = self._result.get()
_CheckForException(value)
return value
def wait(self):
self._result.wait()
if self._pool:
_all_pools.remove(self._pool)
self._pool = None
def ready(self):
return self._result.ready()
def successful(self):
return self._result.successful()
def _TerminatePools():
"""Calls .terminate() on all active process pools.
Not supposed to be necessary according to the docs, but seems to be required
when child process throws an exception or Ctrl-C is hit.
"""
global _silence_exceptions
_silence_exceptions = True
# Child processes cannot have pools, but atexit runs this function because
# it was registered before fork()ing.
if _is_child_process:
return
def close_pool(pool):
try:
pool.terminate()
except: # pylint: disable=bare-except
pass
for i, pool in enumerate(_all_pools):
# Without calling terminate() on a separate thread, the call can block
# forever.
thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
target=close_pool,
args=(pool, ))
thread.daemon = True
thread.start()
def _CheckForException(value):
if isinstance(value, _ExceptionWrapper):
global _silence_exceptions
if not _silence_exceptions:
value.MaybeThrow()
_silence_exceptions = True
logging.error('Subprocess raised an exception:\n%s', value.msg)
sys.exit(1)
def _MakeProcessPool(job_params, **job_kwargs):
global _all_pools
global _fork_params
global _fork_kwargs
assert _fork_params is None
assert _fork_kwargs is None
pool_size = min(len(job_params), multiprocessing.cpu_count())
_fork_params = job_params
_fork_kwargs = job_kwargs
ret = multiprocessing.Pool(pool_size)
_fork_params = None
_fork_kwargs = None
if _all_pools is None:
_all_pools = []
atexit.register(_TerminatePools)
_all_pools.append(ret)
return ret
def ForkAndCall(func, args):
"""Runs |func| in a fork'ed process.
Returns:
A Result object (call .get() to get the return value)
"""
if DISABLE_ASYNC:
pool = None
result = _ImmediateResult(func(*args))
else:
pool = _MakeProcessPool([args]) # Omit |kwargs|.
result = pool.apply_async(_FuncWrapper(func), (0, ))
pool.close()
return _WrappedResult(result, pool=pool)
def BulkForkAndCall(func, arg_tuples, **kwargs):
"""Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
Args:
kwargs: Common keyword arguments to be passed to |func|.
Yields the return values as they come in.
"""
arg_tuples = list(arg_tuples)
if not arg_tuples:
return
if DISABLE_ASYNC:
for args in arg_tuples:
yield func(*args, **kwargs)
return
pool = _MakeProcessPool(arg_tuples, **kwargs)
wrapped_func = _FuncWrapper(func)
try:
for result in pool.imap_unordered(wrapped_func, xrange(len(arg_tuples))):
_CheckForException(result)
yield result
finally:
pool.close()
pool.join()
_all_pools.remove(pool)
......@@ -192,12 +192,12 @@ def BulkForkAndCall(func, arg_tuples, **kwargs):
"""Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
Args:
kwargs: Common key word arguments to be passed to |func|.
kwargs: Common keyword arguments to be passed to |func|.
Yields the return values as they come in.
"""
arg_tuples = list(arg_tuples)
if not len(arg_tuples):
if not arg_tuples:
return
if DISABLE_ASYNC:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment