Commit 4dd7097d authored by Samuel Huang's avatar Samuel Huang Committed by Commit Bot

[Supersize] Add kwargs support to concurrent.BulkForkAndCall().

concurrent.BulkForkAndCall(func, arg_tuples) calls |func()| with
arguments given by |arg_tuples|, which stores arguments over
independent forked calls as a list of tuples. Previously this requires
common parameters (e.g., tool_prefix) to be present in each tuple, and
decreases flexibility for |arg_tuples| preparation code.

This CL adds |kwargs| parameter to BulkForkAndCall(), thereby allowing
common parameters to be given once. As seen in the test changes, a
minor caveat is that |func()| will need to make common parameters
appear at end of argument list.

Bug: 723798
Change-Id: I9aa91b6081b8fe8fa9c87a9ae1cb959d3f17e9df
Reviewed-on: https://chromium-review.googlesource.com/1135204
Commit-Queue: Samuel Huang <huangs@chromium.org>
Reviewed-by: default avataragrieve <agrieve@chromium.org>
Cr-Commit-Position: refs/heads/master@{#574934}
parent 699f0e72
......@@ -26,6 +26,7 @@ _silence_exceptions = False
# Used to pass parameters to forked processes without pickling.
_fork_params = None
_fork_kwargs = None
class _ImmediateResult(object):
......@@ -66,7 +67,7 @@ class _FuncWrapper(object):
def __call__(self, index, _=None):
try:
return self._func(*_fork_params[index])
return self._func(*_fork_params[index], **_fork_kwargs)
except Exception, e:
# Only keep the exception type for builtin exception types or else risk
# further marshalling exceptions.
......@@ -150,14 +151,18 @@ def _CheckForException(value):
sys.exit(1)
def _MakeProcessPool(job_params):
def _MakeProcessPool(job_params, **job_kwargs):
global _all_pools
global _fork_params
global _fork_kwargs
assert _fork_params is None
assert _fork_kwargs is None
pool_size = min(len(job_params), multiprocessing.cpu_count())
_fork_params = job_params
_fork_kwargs = job_kwargs
ret = multiprocessing.Pool(pool_size)
_fork_params = None
_fork_kwargs = None
if _all_pools is None:
_all_pools = []
atexit.register(_TerminatePools)
......@@ -175,15 +180,18 @@ def ForkAndCall(func, args, decode_func=None):
pool = None
result = _ImmediateResult(func(*args))
else:
pool = _MakeProcessPool([args])
pool = _MakeProcessPool([args]) # Omit |kwargs|.
result = pool.apply_async(_FuncWrapper(func), (0,))
pool.close()
return _WrappedResult(result, pool=pool, decode_func=decode_func)
def BulkForkAndCall(func, arg_tuples):
def BulkForkAndCall(func, arg_tuples, **kwargs):
"""Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
Args:
kwargs: Common key word arguments to be passed to |func|.
Yields the return values as they come in.
"""
arg_tuples = list(arg_tuples)
......@@ -192,10 +200,10 @@ def BulkForkAndCall(func, arg_tuples):
if DISABLE_ASYNC:
for args in arg_tuples:
yield func(*args)
yield func(*args, **kwargs)
return
pool = _MakeProcessPool(arg_tuples)
pool = _MakeProcessPool(arg_tuples, **kwargs)
wrapped_func = _FuncWrapper(func)
for result in pool.imap_unordered(wrapped_func, xrange(len(arg_tuples))):
_CheckForException(result)
......@@ -215,7 +223,7 @@ def CallOnThread(func, *args, **kwargs):
return result
def EncodeDictOfLists(d, key_transform=None):
def EncodeDictOfLists(d, key_transform=None, value_transform=None):
"""Serializes a dict where values are lists of strings.
Does not support '' as keys, nor [''] as values.
......@@ -226,7 +234,11 @@ def EncodeDictOfLists(d, key_transform=None):
if key_transform:
keys = (key_transform(k) for k in keys)
keys = '\x01'.join(keys)
values = '\x01'.join('\x02'.join(x) for x in d.itervalues())
if value_transform:
values = '\x01'.join('\x02'.join(value_transform(y) for y in x) for x in
d.itervalues())
else:
values = '\x01'.join('\x02'.join(x) for x in d.itervalues())
return keys, values
......
......@@ -10,7 +10,8 @@ import unittest
import concurrent
def _ForkTestHelper(test_instance, parent_pid, arg1, arg2, _=None):
def _ForkTestHelper(arg1, arg2, pickle_me_not, test_instance, parent_pid):
_ = pickle_me_not # Suppress lint warning.
test_instance.assertNotEquals(os.getpid(), parent_pid)
return arg1 + arg2
......@@ -96,12 +97,13 @@ class ConcurrentTest(unittest.TestCase):
def testForkAndCall_normal(self):
parent_pid = os.getpid()
result = concurrent.ForkAndCall(
_ForkTestHelper, (self, parent_pid, 1, 2, Unpicklable()))
_ForkTestHelper, (1, 2, Unpicklable(), self, parent_pid))
self.assertEquals(3, result.get())
def testForkAndCall_exception(self):
parent_pid = os.getpid()
result = concurrent.ForkAndCall(_ForkTestHelper, (self, parent_pid, 1, 'a'))
result = concurrent.ForkAndCall(
_ForkTestHelper, (1, 'a', None, self, parent_pid))
self.assertRaises(TypeError, result.get)
def testBulkForkAndCall_none(self):
......@@ -111,20 +113,35 @@ class ConcurrentTest(unittest.TestCase):
def testBulkForkAndCall_few(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper, [
(self, parent_pid, 1, 2, Unpicklable()),
(self, parent_pid, 3, 4)])
(1, 2, Unpicklable(), self, parent_pid),
(3, 4, None, self, parent_pid)])
self.assertEquals({3, 7}, set(results))
def testBulkForkAndCall_few_kwargs(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper,
[(1, 2, Unpicklable()), (3, 4, None)],
test_instance=self, parent_pid=parent_pid)
self.assertEquals({3, 7}, set(results))
def testBulkForkAndCall_many(self):
parent_pid = os.getpid()
args = [(self, parent_pid, 1, 2, Unpicklable())] * 100
args = [(1, 2, Unpicklable(), self, parent_pid) for _ in xrange(100)]
results = concurrent.BulkForkAndCall(_ForkTestHelper, args)
self.assertEquals([3] * 100, list(results))
def testBulkForkAndCall_many_kwargs(self):
parent_pid = os.getpid()
args = [(1, 2) for _ in xrange(100)]
results = concurrent.BulkForkAndCall(
_ForkTestHelper, args, pickle_me_not=Unpicklable(), test_instance=self,
parent_pid=parent_pid)
self.assertEquals([3] * 100, list(results))
def testBulkForkAndCall_exception(self):
parent_pid = os.getpid()
results = concurrent.BulkForkAndCall(_ForkTestHelper, [
(self, parent_pid, 1, 'a')])
(1, 'a', self, parent_pid)])
self.assertRaises(TypeError, results.next)
if __name__ == '__main__':
......
......@@ -487,19 +487,21 @@ class _BulkObjectFileAnalyzerWorker(object):
for path in paths:
# Note: _ResolveStringPieces relies upon .a not being grouped.
if path.endswith('.a'):
yield path, self._tool_prefix, self._output_directory
yield (path,)
else:
object_paths.append(path)
BATCH_SIZE = 50 # Chosen arbitrarily.
for i in xrange(0, len(object_paths), BATCH_SIZE):
batch = object_paths[i:i + BATCH_SIZE]
yield batch, self._tool_prefix, self._output_directory
yield (batch,)
params = list(iter_job_params())
# Order of the jobs doesn't matter since each job owns independent paths,
# and our output is a dict where paths are the key.
results = concurrent.BulkForkAndCall(_RunNmOnIntermediates, params)
results = concurrent.BulkForkAndCall(
_RunNmOnIntermediates, params, tool_prefix=self._tool_prefix,
output_directory=self._output_directory)
# Names are still mangled.
all_paths_by_name = self._paths_by_name
......@@ -530,12 +532,13 @@ class _BulkObjectFileAnalyzerWorker(object):
(addr - adjust, s) for addr, s in elf_string_positions)
string_data = _ReadFileChunks(elf_path, abs_string_positions)
params = (
(chunk, string_data, self._tool_prefix, self._output_directory)
params = ((chunk,)
for chunk in self._encoded_string_addresses_by_path_chunks)
# Order of the jobs doesn't matter since each job owns independent paths,
# and our output is a dict where paths are the key.
results = concurrent.BulkForkAndCall(_ResolveStringPieces, params)
results = concurrent.BulkForkAndCall(
_ResolveStringPieces, params, string_data=string_data,
tool_prefix=self._tool_prefix, output_directory=self._output_directory)
results = list(results)
final_result = []
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment