Commit 41ddc64a authored by stgao's avatar stgao Committed by Commit bot

[Findit] Fix threading.

BUG=
NOTRY=true

Review URL: https://codereview.chromium.org/525433003

Cr-Commit-Position: refs/heads/master@{#292985}
parent 94eba845
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be # Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. # found in the LICENSE file.
from threading import Lock, Thread from threading import Lock
from common import utils from common import utils
import crash_utils import crash_utils
...@@ -77,7 +77,7 @@ class BlameList(object): ...@@ -77,7 +77,7 @@ class BlameList(object):
""" """
# Only return blame information for first 'top_n_frames' frames. # Only return blame information for first 'top_n_frames' frames.
stack_frames = callstack.GetTopNFrames(top_n_frames) stack_frames = callstack.GetTopNFrames(top_n_frames)
threads = [] tasks = []
# Iterate through frames in stack. # Iterate through frames in stack.
for stack_frame in stack_frames: for stack_frame in stack_frames:
# If the component this line is from does not have a crash revision, # If the component this line is from does not have a crash revision,
...@@ -102,17 +102,14 @@ class BlameList(object): ...@@ -102,17 +102,14 @@ class BlameList(object):
range_start = int(component_object['old_revision']) range_start = int(component_object['old_revision'])
range_end = int(component_object['new_revision']) range_end = int(component_object['new_revision'])
# Generate blame entry, one thread for one entry. # Create a task to generate blame entry.
blame_thread = Thread( tasks.append({
target=self.__GenerateBlameEntry, 'function': self.__GenerateBlameEntry,
args=[repository_parser, stack_frame, crash_revision, 'args': [repository_parser, stack_frame, crash_revision,
range_start, range_end]) range_start, range_end]})
threads.append(blame_thread)
blame_thread.start() # Run all the tasks.
crash_utils.RunTasks(tasks)
# Join the results before returning.
for blame_thread in threads:
blame_thread.join()
def __GenerateBlameEntry(self, repository_parser, stack_frame, def __GenerateBlameEntry(self, repository_parser, stack_frame,
crash_revision, range_start, range_end): crash_revision, range_start, range_end):
......
...@@ -6,6 +6,8 @@ import cgi ...@@ -6,6 +6,8 @@ import cgi
import ConfigParser import ConfigParser
import json import json
import os import os
import Queue
import threading
import time import time
from common import utils from common import utils
...@@ -14,6 +16,56 @@ from result import Result ...@@ -14,6 +16,56 @@ from result import Result
INFINITY = float('inf') INFINITY = float('inf')
MAX_THREAD_NUMBER = 10
TASK_QUEUE = None
def Worker():
global TASK_QUEUE
while True:
function, args, kwargs, result_semaphore = TASK_QUEUE.get()
try:
function(*args, **kwargs)
except:
pass
finally:
# Signal one task is done in case of exception.
result_semaphore.release()
def RunTasks(tasks):
"""Run given tasks. Not thread-safe: no concurrent calls of this function.
Return after all tasks were completed. A task is a dict as below:
{
'function': the function to call,
'args': the positional argument to pass to the function,
'kwargs': the key-value arguments to pass to the function,
}
"""
if not tasks:
return
global TASK_QUEUE
if not TASK_QUEUE:
TASK_QUEUE = Queue.Queue()
for index in range(MAX_THREAD_NUMBER):
thread = threading.Thread(target=Worker, name='worker_%s' % index)
# Set as daemon, so no join is needed.
thread.daemon = True
thread.start()
result_semaphore = threading.Semaphore(0)
# Push task to task queue for execution.
for task in tasks:
TASK_QUEUE.put(
(task['function'], task.get('args', []),
task.get('kwargs', {}), result_semaphore))
# Wait until all tasks to be executed.
for _ in tasks:
result_semaphore.acquire()
def GetRepositoryType(revision_number): def GetRepositoryType(revision_number):
"""Returns the repository type of this revision number. """Returns the repository type of this revision number.
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# found in the LICENSE file. # found in the LICENSE file.
import os import os
from threading import Lock, Thread from threading import Lock
import blame import blame
from common import utils from common import utils
...@@ -139,8 +139,8 @@ def FindMatch(revisions_info_map, file_to_revision_info, file_to_crash_info, ...@@ -139,8 +139,8 @@ def FindMatch(revisions_info_map, file_to_revision_info, file_to_crash_info,
Matches, a set of match objects. Matches, a set of match objects.
""" """
matches = match_set.MatchSet(codereview_api_url) matches = match_set.MatchSet(codereview_api_url)
threads = []
tasks = []
# Iterate through the crashed files in the stacktrace. # Iterate through the crashed files in the stacktrace.
for crashed_file_path in file_to_crash_info: for crashed_file_path in file_to_crash_info:
# Ignore header file. # Ignore header file.
...@@ -172,17 +172,15 @@ def FindMatch(revisions_info_map, file_to_revision_info, file_to_crash_info, ...@@ -172,17 +172,15 @@ def FindMatch(revisions_info_map, file_to_revision_info, file_to_crash_info,
revision = revisions_info_map[cl] revision = revisions_info_map[cl]
match_thread = Thread( tasks.append({
target=GenerateMatchEntry, 'function': GenerateMatchEntry,
args=[matches, revision, cl, changed_file_path, functions, 'args':[matches, revision, cl, changed_file_path, functions,
component_path, component_name, crashed_line_numbers, component_path, component_name, crashed_line_numbers,
stack_frame_nums, file_change_type, stack_frame_nums, file_change_type,
repository_parser]) repository_parser]})
threads.append(match_thread)
match_thread.start()
for match_thread in threads: # Run all the tasks.
match_thread.join() crash_utils.RunTasks(tasks)
matches.RemoveRevertedCLs() matches.RemoveRevertedCLs()
...@@ -241,8 +239,7 @@ def FindMatchForCallstack( ...@@ -241,8 +239,7 @@ def FindMatchForCallstack(
components) components)
callstack_priority = callstack.priority callstack_priority = callstack.priority
# Iterate through all components and create new thread for each component. # Iterate through all components.
threads = []
for component_path in component_dict: for component_path in component_dict:
# If the component to consider in this callstack is not in the parsed list # If the component to consider in this callstack is not in the parsed list
# of components, ignore this one. # of components, ignore this one.
...@@ -251,15 +248,8 @@ def FindMatchForCallstack( ...@@ -251,15 +248,8 @@ def FindMatchForCallstack(
changelog = component_to_changelog_map[component_path] changelog = component_to_changelog_map[component_path]
file_to_crash_info = component_dict.GetFileDict(component_path) file_to_crash_info = component_dict.GetFileDict(component_path)
t = Thread( FindMatchForComponent(component_path, file_to_crash_info, changelog,
target=FindMatchForComponent, callstack_priority, results, results_lock)
args=[component_path, file_to_crash_info, changelog,
callstack_priority, results, results_lock])
threads.append(t)
t.start()
for t in threads:
t.join()
def FindMatchForStacktrace(stacktrace, components, def FindMatchForStacktrace(stacktrace, components,
...@@ -320,18 +310,10 @@ def FindMatchForStacktrace(stacktrace, components, ...@@ -320,18 +310,10 @@ def FindMatchForStacktrace(stacktrace, components,
revisions, revisions,
file_to_revision_map) file_to_revision_map)
# Create separate threads for each of the call stack in the stacktrace. # Analyze each of the call stacks in the stacktrace.
threads = []
for callstack in stacktrace.stack_list: for callstack in stacktrace.stack_list:
t = Thread( FindMatchForCallstack(callstack, components, component_to_changelog_map,
target=FindMatchForCallstack, results, results_lock)
args=[callstack, components, component_to_changelog_map,
results, results_lock])
threads.append(t)
t.start()
for t in threads:
t.join()
return results return results
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment