Commit 153aab14 authored by inferno's avatar inferno Committed by Commit bot

Cleanup ipc fuzzer. Add flags to pick random prefix on launch.

R=tsepez@chromium.org,mbarbella@chromium.org
BUG=450268

Review URL: https://codereview.chromium.org/888603003

Cr-Commit-Position: refs/heads/master@{#313790}
parent 7f7052a7
......@@ -24,7 +24,7 @@ class CFPackageBuilder:
def __init__(self):
self.fuzzer_list = FUZZER_LIST
def parse_args(self):
def parse_arguments(self):
desc = 'Builder of IPC fuzzer packages for ClusterFuzz'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--out-dir', dest='out_dir', default='out',
......@@ -33,7 +33,7 @@ class CFPackageBuilder:
help='Debug vs. Release build')
self.args = parser.parse_args()
def get_paths(self):
def set_application_paths(self):
script_path = os.path.realpath(__file__)
self.mutate_dir = os.path.dirname(script_path)
src_dir = os.path.join(self.mutate_dir, os.pardir, os.pardir, os.pardir)
......@@ -41,12 +41,12 @@ class CFPackageBuilder:
out_dir = os.path.join(src_dir, self.args.out_dir)
self.build_dir = os.path.join(out_dir, self.args.build_type)
def enter_tmp_workdir(self):
def switch_to_temp_work_directory(self):
self.old_cwd = os.getcwd()
self.work_dir = tempfile.mkdtemp()
os.chdir(self.work_dir)
def rm_tmp_workdir(self):
def remove_temp_work_directory(self):
os.chdir(self.old_cwd)
shutil.rmtree(self.work_dir)
......@@ -55,19 +55,23 @@ class CFPackageBuilder:
fuzzer_src_path = os.path.join(self.mutate_dir, fuzzer + '.py')
fuzzer_dst_path = os.path.join(fuzzer, 'run.py')
shutil.copyfile(fuzzer_src_path, fuzzer_dst_path)
utils_src_path = os.path.join(self.mutate_dir, 'utils.py')
utils_dst_path = os.path.join(fuzzer, 'utils.py')
shutil.copyfile(utils_src_path, utils_dst_path)
distutils.archive_util.make_zipfile(fuzzer, fuzzer)
package_name = fuzzer + '.zip'
shutil.copy(package_name, self.build_dir)
final_package_path = os.path.join(self.build_dir, package_name)
print 'Built %s' % final_package_path
print 'Built %s.' % final_package_path
def main(self):
self.parse_args()
self.get_paths()
self.enter_tmp_workdir()
self.parse_arguments()
self.set_application_paths()
self.switch_to_temp_work_directory()
for fuzzer in self.fuzzer_list:
self.build_package(fuzzer)
self.rm_tmp_workdir()
self.remove_temp_work_directory()
return 0
if __name__ == "__main__":
......
......@@ -8,73 +8,52 @@ GenerateTraits. Support of GenerateTraits for different types will be gradually
added.
"""
import argparse
import os
import random
import string
import subprocess
import sys
import tempfile
import time
import utils
# Number of IPC messages per ipcdump
NUM_IPC_MESSAGES = 1500
IPC_GENERATE_APPLICATION = 'ipc_fuzzer_generate'
IPC_REPLAY_APPLICATION = 'ipc_fuzzer_replay'
MAX_IPC_MESSAGES_PER_TESTCASE = 1500
def platform():
if sys.platform.startswith('win'):
return 'WINDOWS'
if sys.platform.startswith('linux'):
return 'LINUX'
if sys.platform == 'darwin':
return 'MAC'
assert False, 'Unknown platform'
def random_id(size=16, chars=string.ascii_lowercase):
return ''.join(random.choice(chars) for x in range(size))
def random_ipcdump_path(ipcdump_dir):
return os.path.join(ipcdump_dir, 'fuzz-' + random_id() + '.ipcdump')
class GenerationalFuzzer:
def parse_cf_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--input_dir')
parser.add_argument('--output_dir')
parser.add_argument('--no_of_files', type=int)
self.args = args = parser.parse_args();
if not args.input_dir or not args.output_dir or not args.no_of_files:
parser.print_help()
sys.exit(1)
def get_paths(self):
app_path_key = 'APP_PATH'
self.generate_binary = 'ipc_fuzzer_generate'
self.util_binary = 'ipc_message_util'
if platform() == 'WINDOWS':
self.generate_binary += '.exe'
self.util_binary += '.exe'
def parse_arguments(self):
self.args = utils.parse_arguments()
def set_application_paths(self):
chrome_application_path = utils.get_application_path()
chrome_application_directory = os.path.dirname(chrome_application_path)
self.ipc_generate_binary = utils.application_name_for_platform(
IPC_GENERATE_APPLICATION)
self.ipc_replay_binary = utils.application_name_for_platform(
IPC_REPLAY_APPLICATION)
self.ipc_generate_binary_path = os.path.join(
chrome_application_directory, self.ipc_generate_binary)
self.ipc_replay_binary_path = os.path.join(
chrome_application_directory, self.ipc_replay_binary)
def generate_ipcdump_testcase(self):
ipcdump_testcase_path = (
utils.random_ipcdump_testcase_path(self.args.output_dir))
num_ipc_messages = random.randint(1, MAX_IPC_MESSAGES_PER_TESTCASE)
count_option = '--count=%d' % num_ipc_messages
cmd = [self.ipc_generate_binary_path, count_option, ipcdump_testcase_path]
if app_path_key not in os.environ:
sys.exit('Env var %s should be set to chrome path' % app_path_key)
chrome_path = os.environ[app_path_key]
out_dir = os.path.dirname(chrome_path)
self.util_path = os.path.join(out_dir, self.util_binary)
self.generate_path = os.path.join(out_dir, self.generate_binary)
def generate_ipcdump(self):
generated_ipcdump = random_ipcdump_path(self.args.output_dir)
cmd = [self.generate_path,
'--count=' + str(NUM_IPC_MESSAGES),
generated_ipcdump]
if subprocess.call(cmd):
sys.exit('%s failed' % self.generate_binary)
sys.exit('%s failed.' % self.ipc_generate_binary)
utils.create_flags_file(ipcdump_testcase_path, self.ipc_replay_binary_path)
def main(self):
self.parse_cf_args()
self.get_paths()
for i in xrange(self.args.no_of_files):
self.generate_ipcdump()
self.parse_arguments()
self.set_application_paths()
for _ in xrange(self.args.no_of_files):
self.generate_ipcdump_testcase()
return 0
if __name__ == "__main__":
......
......@@ -11,93 +11,79 @@ This fuzzer will pick some ipcdumps from the corpus, concatenate them with
ipc_message_util and mutate the result with ipc_fuzzer_mutate.
"""
import argparse
import os
import random
import string
import subprocess
import sys
import tempfile
import time
import utils
# Number of ipcdumps to concatenate
NUM_IPCDUMPS = 50
def platform():
if sys.platform.startswith('win'):
return 'WINDOWS'
if sys.platform.startswith('linux'):
return 'LINUX'
if sys.platform == 'darwin':
return 'MAC'
assert False, 'Unknown platform'
def create_temp_file():
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.close()
return temp_file.name
def random_id(size=16, chars=string.ascii_lowercase):
return ''.join(random.choice(chars) for x in range(size))
def random_ipcdump_path(ipcdump_dir):
return os.path.join(ipcdump_dir, 'fuzz-' + random_id() + '.ipcdump')
IPC_MESSAGE_UTIL_APPLICATION = 'ipc_message_util'
IPC_MUTATE_APPLICATION = 'ipc_fuzzer_mutate'
IPC_REPLAY_APPLICATION = 'ipc_fuzzer_replay'
IPCDUMP_MERGE_LIMIT = 50
class MutationalFuzzer:
def parse_cf_args(self):
parser = argparse.ArgumentParser()
parser.add_argument('--input_dir')
parser.add_argument('--output_dir')
parser.add_argument('--no_of_files', type=int)
self.args = args = parser.parse_args();
if not args.input_dir or not args.output_dir or not args.no_of_files:
parser.print_help()
sys.exit(1)
def get_paths(self):
app_path_key = 'APP_PATH'
self.mutate_binary = 'ipc_fuzzer_mutate'
self.util_binary = 'ipc_message_util'
if platform() == 'WINDOWS':
self.mutate_binary += '.exe'
self.util_binary += '.exe'
if app_path_key not in os.environ:
sys.exit('Env var %s should be set to chrome path' % app_path_key)
chrome_path = os.environ[app_path_key]
out_dir = os.path.dirname(chrome_path)
self.util_path = os.path.join(out_dir, self.util_binary)
self.mutate_path = os.path.join(out_dir, self.mutate_binary)
def list_corpus(self):
input_dir = self.args.input_dir
entries = os.listdir(input_dir)
entries = [i for i in entries if i.endswith('.ipcdump')]
self.corpus = [os.path.join(input_dir, entry) for entry in entries]
def create_mutated_ipcdump(self):
ipcdumps = ','.join(random.sample(self.corpus, NUM_IPCDUMPS))
tmp_ipcdump = create_temp_file()
mutated_ipcdump = random_ipcdump_path(self.args.output_dir)
# concatenate ipcdumps -> tmp_ipcdump
cmd = [self.util_path, ipcdumps, tmp_ipcdump]
def parse_arguments(self):
self.args = utils.parse_arguments()
def set_application_paths(self):
chrome_application_path = utils.get_application_path()
chrome_application_directory = os.path.dirname(chrome_application_path)
self.ipc_message_util_binary = utils.application_name_for_platform(
IPC_MESSAGE_UTIL_APPLICATION)
self.ipc_mutate_binary = utils.application_name_for_platform(
IPC_MUTATE_APPLICATION)
self.ipc_replay_binary = utils.application_name_for_platform(
IPC_REPLAY_APPLICATION)
self.ipc_message_util_binary_path = os.path.join(
chrome_application_directory, self.ipc_message_util_binary)
self.ipc_mutate_binary_path = os.path.join(
chrome_application_directory, self.ipc_mutate_binary)
self.ipc_replay_binary_path = os.path.join(
chrome_application_directory, self.ipc_replay_binary)
def set_corpus(self):
input_directory = self.args.input_dir
entries = os.listdir(input_directory)
entries = [i for i in entries if i.endswith(utils.IPCDUMP_EXTENSION)]
self.corpus = [os.path.join(input_directory, entry) for entry in entries]
def create_mutated_ipcdump_testcase(self):
ipcdumps = ','.join(random.sample(self.corpus, IPCDUMP_MERGE_LIMIT))
tmp_ipcdump_testcase = utils.create_temp_file()
mutated_ipcdump_testcase = (
utils.random_ipcdump_testcase_path(self.args.output_dir))
# Concatenate ipcdumps -> tmp_ipcdump.
cmd = [
self.ipc_message_util_binary_path,
ipcdumps,
tmp_ipcdump_testcase,
]
if subprocess.call(cmd):
sys.exit('%s failed' % self.util_binary)
# mutate tmp_ipcdump -> mutated_ipcdump
cmd = [self.mutate_path, tmp_ipcdump, mutated_ipcdump]
sys.exit('%s failed.' % self.ipc_message_util_binary)
# Mutate tmp_ipcdump -> mutated_ipcdump.
cmd = [
self.ipc_mutate_binary_path,
tmp_ipcdump_testcase,
mutated_ipcdump_testcase,
]
if subprocess.call(cmd):
sys.exit('%s failed' % self.mutate_binary)
os.remove(tmp_ipcdump)
sys.exit('%s failed.' % self.ipc_mutate_binary)
utils.create_flags_file(
mutated_ipcdump_testcase, self.ipc_replay_binary_path)
os.remove(tmp_ipcdump_testcase)
def main(self):
self.parse_cf_args()
self.get_paths()
self.list_corpus()
for i in xrange(self.args.no_of_files):
self.create_mutated_ipcdump()
self.parse_arguments()
self.set_application_paths()
self.set_corpus()
for _ in xrange(self.args.no_of_files):
self.create_mutated_ipcdump_testcase()
return 0
if __name__ == "__main__":
......
#!/usr/bin/env python
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utility functions used by Generational and Mutational ClusterFuzz
fuzzers."""
import argparse
import os
import random
import string
import sys
import tempfile
APP_PATH_KEY = 'APP_PATH'
FLAGS_PREFIX = 'flags-'
FUZZ_PREFIX = 'fuzz-'
IPC_REPLAY_APPLICATION = 'ipc_fuzzer_replay'
IPCDUMP_EXTENSION = '.ipcdump'
LAUNCH_PREFIXES = [
'--gpu-launcher',
'--plugin-launcher',
'--ppapi-plugin-launcher',
'--renderer-cmd-prefix',
'--utility-cmd-prefix',
]
def application_name_for_platform(application_name):
"""Return application name for current platform."""
if platform() == 'WINDOWS':
return application_name + '.exe'
return application_name
def create_flags_file(ipcdump_testcase_path, ipc_replay_application_path):
"""Create a flags file to add launch prefix to application command line."""
random_launch_prefix = random.choice(LAUNCH_PREFIXES)
file_content = '%s=%s' % (random_launch_prefix, ipc_replay_application_path)
flags_file_path = ipcdump_testcase_path.replace(FUZZ_PREFIX, FLAGS_PREFIX)
file_handle = open(flags_file_path, 'w')
file_handle.write(file_content)
file_handle.close()
def create_temp_file():
"""Create a temporary file."""
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.close()
return temp_file.name
def parse_arguments():
"""Parse fuzzer arguments."""
parser = argparse.ArgumentParser()
parser.add_argument('--input_dir')
parser.add_argument('--output_dir')
parser.add_argument('--no_of_files', type=int)
args = parser.parse_args();
if (not args.input_dir or
not args.output_dir or
not args.no_of_files):
parser.print_help()
sys.exit(1)
return args
def random_id(size=16, chars=string.ascii_lowercase):
"""Return a random id string, default 16 characters long."""
return ''.join(random.choice(chars) for _ in range(size))
def random_ipcdump_testcase_path(ipcdump_directory):
"""Return a random ipc testcase path."""
return os.path.join(
ipcdump_directory,
'%s%s%s' % (FUZZ_PREFIX, random_id(), IPCDUMP_EXTENSION))
def platform():
"""Return running platform."""
if sys.platform.startswith('win'):
return 'WINDOWS'
if sys.platform.startswith('linux'):
return 'LINUX'
if sys.platform == 'darwin':
return 'MAC'
assert False, 'Unknown platform'
def get_application_path():
"""Return chrome application path."""
if APP_PATH_KEY not in os.environ:
sys.exit(
'Environment variable %s should be set to chrome path.' % APP_PATH_KEY)
return os.environ[APP_PATH_KEY]
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment