Commit bf76be72 authored by dschuff@chromium.org's avatar dschuff@chromium.org

Simplify Pnacl translation thread code

Primarily a merge of the streaming-related code into PnaclTranslationThread

R=jvoung@chromium.org,sehr@chromium.org
BUG=http://code.google.com/p/nativeclient/issues/detail?id=2195
TEST=nacl_integration


Review URL: https://chromiumcodereview.appspot.com/10827109

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@149464 0039d316-1c4b-4281-b951-d872f2087c98
parent b0ca0c2c
......@@ -65,7 +65,6 @@ common_inputs = [
'plugin.cc',
'pnacl_coordinator.cc',
'pnacl_resources.cc',
'pnacl_streaming_translate_thread.cc',
'pnacl_translate_thread.cc',
'scriptable_plugin.cc',
'sel_ldr_launcher_chrome.cc',
......
......@@ -15,7 +15,6 @@
'plugin.cc',
'pnacl_coordinator.cc',
'pnacl_resources.cc',
'pnacl_streaming_translate_thread.cc',
'pnacl_translate_thread.cc',
'scriptable_plugin.cc',
'sel_ldr_launcher_chrome.cc',
......
......@@ -13,7 +13,6 @@
#include "native_client/src/trusted/plugin/manifest.h"
#include "native_client/src/trusted/plugin/plugin.h"
#include "native_client/src/trusted/plugin/plugin_error.h"
#include "native_client/src/trusted/plugin/pnacl_streaming_translate_thread.h"
#include "native_client/src/trusted/plugin/pnacl_translate_thread.h"
#include "native_client/src/trusted/plugin/service_runtime.h"
#include "native_client/src/trusted/plugin/temporary_file.h"
......@@ -440,7 +439,7 @@ void PnaclCoordinator::CachedFileDidOpen(int32_t pp_error) {
// Create the translation thread object immediately. This ensures that any
// pieces of the file that get downloaded before the compilation thread
// is accepting SRPCs won't get dropped.
translate_thread_.reset(new PnaclStreamingTranslateThread());
translate_thread_.reset(new PnaclTranslateThread());
if (translate_thread_ == NULL) {
ReportNonPpapiError("could not allocate translation thread.");
return;
......@@ -484,10 +483,8 @@ void PnaclCoordinator::BitcodeStreamGotData(int32_t pp_error,
FileStreamData data) {
PLUGIN_PRINTF(("PnaclCoordinator::BitcodeStreamGotData (pp_error=%"
NACL_PRId32", data=%p)\n", pp_error, data ? &(*data)[0] : 0));
PnaclStreamingTranslateThread* thread =
static_cast<PnaclStreamingTranslateThread*>(translate_thread_.get());
DCHECK(thread);
thread->PutBytes(data, pp_error);
DCHECK(translate_thread_.get());
translate_thread_->PutBytes(data, pp_error);
}
StreamCallback PnaclCoordinator::GetCallback() {
......
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "native_client/src/trusted/plugin/pnacl_streaming_translate_thread.h"
#include "native_client/src/include/nacl_scoped_ptr.h"
#include "native_client/src/trusted/plugin/plugin.h"
#include "native_client/src/trusted/plugin/pnacl_resources.h"
#include "native_client/src/trusted/plugin/srpc_params.h"
#include "native_client/src/trusted/plugin/temporary_file.h"
namespace plugin {
PnaclStreamingTranslateThread::PnaclStreamingTranslateThread() : done_(false) {
NaClXMutexCtor(&cond_mu_);
NaClXCondVarCtor(&buffer_cond_);
}
PnaclStreamingTranslateThread::~PnaclStreamingTranslateThread() {
PLUGIN_PRINTF(("~PnaclTranslateThread (translate_thread=%p)\n", this));
SetSubprocessesShouldDie();
NaClThreadJoin(translate_thread_.get());
PLUGIN_PRINTF(("~PnaclTranslateThread joined\n"));
}
void PnaclStreamingTranslateThread::RunTranslate(
const pp::CompletionCallback& finish_callback,
const Manifest* manifest,
const Manifest* ld_manifest,
TempFile* obj_file,
LocalTempFile* nexe_file,
ErrorInfo* error_info,
PnaclResources* resources,
Plugin* plugin) {
PLUGIN_PRINTF(("PnaclStreamingTranslateThread::RunTranslate)\n"));
manifest_ = manifest;
ld_manifest_ = ld_manifest;
obj_file_ = obj_file;
nexe_file_ = nexe_file;
coordinator_error_info_ = error_info;
resources_ = resources;
plugin_ = plugin;
// Invoke llc followed by ld off the main thread. This allows use of
// blocking RPCs that would otherwise block the JavaScript main thread.
report_translate_finished_ = finish_callback;
translate_thread_.reset(new NaClThread);
if (translate_thread_ == NULL) {
TranslateFailed("could not allocate thread struct.");
return;
}
const int32_t kArbitraryStackSize = 128 * 1024;
if (!NaClThreadCreateJoinable(translate_thread_.get(),
DoTranslateThread,
this,
kArbitraryStackSize)) {
TranslateFailed("could not create thread.");
translate_thread_.reset(NULL);
}
}
// Called from main thread to send bytes to the translator.
void PnaclStreamingTranslateThread::PutBytes(std::vector<char>* bytes,
int count) {
PLUGIN_PRINTF(("PutBytes, this %p bytes %p, size %d, count %d\n", this, bytes,
bytes ? bytes->size(): 0, count));
size_t buffer_size = 0;
// If we are done (error or not), Signal the translation thread to stop.
if (count <= PP_OK) {
NaClXMutexLock(&cond_mu_);
done_ = true;
NaClXCondVarSignal(&buffer_cond_);
NaClXMutexUnlock(&cond_mu_);
return;
}
CHECK(bytes != NULL);
// Ensure that the buffer we send to the translation thread is the right size
// (count can be < the buffer size). This can be done without the lock.
buffer_size = bytes->size();
bytes->resize(count);
NaClXMutexLock(&cond_mu_);
data_buffers_.push_back(std::vector<char>());
bytes->swap(data_buffers_.back()); // Avoid copying the buffer data.
NaClXCondVarSignal(&buffer_cond_);
NaClXMutexUnlock(&cond_mu_);
// Ensure the buffer we send back to the coordinator is the expected size
bytes->resize(buffer_size);
}
void PnaclStreamingTranslateThread::DoTranslate() {
ErrorInfo error_info;
nacl::scoped_ptr<NaClSubprocess> llc_subprocess(
StartSubprocess(PnaclUrls::GetLlcUrl(), manifest_, &error_info));
if (llc_subprocess == NULL) {
TranslateFailed("Compile process could not be created: " +
error_info.message());
return;
}
// Run LLC.
SrpcParams params;
nacl::DescWrapper* llc_out_file = obj_file_->get_wrapper();
PluginReverseInterface* llc_reverse =
llc_subprocess->service_runtime()->rev_interface();
llc_reverse->AddTempQuotaManagedFile(obj_file_->identifier());
RegisterReverseInterface(llc_reverse);
if (!llc_subprocess->InvokeSrpcMethod("StreamInit",
"h",
&params,
llc_out_file->desc())) {
// StreamInit returns an error message if the RPC fails.
TranslateFailed(nacl::string("Stream init failed: ") +
nacl::string(params.outs()[0]->arrays.str));
return;
}
PLUGIN_PRINTF(("PnaclCoordinator: StreamInit successful\n"));
// llc process is started.
while(!done_ || data_buffers_.size() > 0) {
NaClXMutexLock(&cond_mu_);
while(!done_ && data_buffers_.size() == 0) {
NaClXCondVarWait(&buffer_cond_, &cond_mu_);
}
PLUGIN_PRINTF(("PnaclTranslateThread awake, done %d, size %d\n",
done_, data_buffers_.size()));
if (data_buffers_.size() > 0) {
std::vector<char> data;
data.swap(data_buffers_.front());
data_buffers_.pop_front();
NaClXMutexUnlock(&cond_mu_);
PLUGIN_PRINTF(("StreamChunk\n"));
if (!llc_subprocess->InvokeSrpcMethod("StreamChunk",
"C",
&params,
&data[0],
data.size())) {
TranslateFailed("Compile stream chunk failed.");
return;
}
PLUGIN_PRINTF(("StreamChunk Successful\n"));
} else {
NaClXMutexUnlock(&cond_mu_);
}
if (SubprocessesShouldDie()) {
TranslateFailed("Stopped by coordinator.");
return;
}
}
PLUGIN_PRINTF(("PnaclTranslateThread done with chunks\n"));
// Finish llc.
if(!llc_subprocess->InvokeSrpcMethod("StreamEnd",
"",
&params)) {
PLUGIN_PRINTF(("PnaclTranslateThread StreamEnd failed\n"));
TranslateFailed(params.outs()[3]->arrays.str);
return;
}
// LLC returns values that are used to determine how linking is done.
int is_shared_library = (params.outs()[0]->u.ival != 0);
nacl::string soname = params.outs()[1]->arrays.str;
nacl::string lib_dependencies = params.outs()[2]->arrays.str;
PLUGIN_PRINTF(("PnaclCoordinator: compile (translator=%p) succeeded"
" is_shared_library=%d, soname='%s', lib_dependencies='%s')\n",
this, is_shared_library, soname.c_str(),
lib_dependencies.c_str()));
// Shut down the llc subprocess.
RegisterReverseInterface(NULL);
llc_subprocess.reset(NULL);
if (SubprocessesShouldDie()) {
TranslateFailed("stopped by coordinator.");
return;
}
if(!RunLdSubprocess(is_shared_library, soname, lib_dependencies)) {
return;
}
pp::Core* core = pp::Module::Get()->core();
core->CallOnMainThread(0, report_translate_finished_, PP_OK);
}
void PnaclStreamingTranslateThread::SetSubprocessesShouldDie() {
PnaclTranslateThread::SetSubprocessesShouldDie();
nacl::MutexLocker ml(&cond_mu_);
done_ = true;
NaClXCondVarSignal(&buffer_cond_);
}
} // namespace plugin
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef NATIVE_CLIENT_SRC_TRUSTED_PLUGIN_PNACL_STREAMING_TRANSLATE_THREAD_H_
#define NATIVE_CLIENT_SRC_TRUSTED_PLUGIN_PNACL_STREAMING_TRANSLATE_THREAD_H_
#include <deque>
#include <vector>
#include "native_client/src/include/nacl_macros.h"
#include "native_client/src/shared/platform/nacl_sync_checked.h"
#include "native_client/src/trusted/plugin/pnacl_translate_thread.h"
namespace plugin {
class PnaclStreamingTranslateThread : public PnaclTranslateThread {
public:
PnaclStreamingTranslateThread();
virtual ~PnaclStreamingTranslateThread();
// Start the translation process. It will continue to run and consume data
// as it is passed in with PutBytes.
virtual void RunTranslate(const pp::CompletionCallback& finish_callback,
const Manifest* manifest,
const Manifest* ld_manifest,
TempFile* obj_file,
LocalTempFile* nexe_file,
ErrorInfo* error_info,
PnaclResources* resources,
Plugin* plugin);
// Kill the translation and/or linking processes.
virtual void SetSubprocessesShouldDie();
// Send bitcode bytes to the translator. Called from the main thread.
void PutBytes(std::vector<char>* data, int count);
private:
NACL_DISALLOW_COPY_AND_ASSIGN(PnaclStreamingTranslateThread);
// Run the streaming translation. Call on the translation/SRPC thread.
void DoTranslate();
bool done_;
// Condition variable to synchronize communication with the SRPC thread.
// SRPC thread waits on this condvar if data_buffers_ is empty (meaning
// there is no bitcode to send to the translator), and the main thread
// appends to data_buffers_ and signals it when it receives bitcode.
struct NaClCondVar buffer_cond_;
// Mutex for buffer_cond_.
struct NaClMutex cond_mu_;
// Data buffers from FileDownloader are enqueued here to pass from the
// main thread to the SRPC thread. Protected by cond_mu_
std::deque<std::vector<char> > data_buffers_;
};
} // namespace plugin
#endif // NATIVE_CLIENT_SRC_TRUSTED_PLUGIN_PNACL_STREAMING_TRANSLATE_THREAD_H_
......@@ -15,6 +15,7 @@ namespace plugin {
PnaclTranslateThread::PnaclTranslateThread() : subprocesses_should_die_(false),
current_rev_interface_(NULL),
done_(false),
manifest_(NULL),
ld_manifest_(NULL),
obj_file_(NULL),
......@@ -23,6 +24,77 @@ PnaclTranslateThread::PnaclTranslateThread() : subprocesses_should_die_(false),
resources_(NULL),
plugin_(NULL) {
NaClXMutexCtor(&subprocess_mu_);
NaClXMutexCtor(&cond_mu_);
NaClXCondVarCtor(&buffer_cond_);
}
void PnaclTranslateThread::RunTranslate(
const pp::CompletionCallback& finish_callback,
const Manifest* manifest,
const Manifest* ld_manifest,
TempFile* obj_file,
LocalTempFile* nexe_file,
ErrorInfo* error_info,
PnaclResources* resources,
Plugin* plugin) {
PLUGIN_PRINTF(("PnaclStreamingTranslateThread::RunTranslate)\n"));
manifest_ = manifest;
ld_manifest_ = ld_manifest;
obj_file_ = obj_file;
nexe_file_ = nexe_file;
coordinator_error_info_ = error_info;
resources_ = resources;
plugin_ = plugin;
// Invoke llc followed by ld off the main thread. This allows use of
// blocking RPCs that would otherwise block the JavaScript main thread.
report_translate_finished_ = finish_callback;
translate_thread_.reset(new NaClThread);
if (translate_thread_ == NULL) {
TranslateFailed("could not allocate thread struct.");
return;
}
const int32_t kArbitraryStackSize = 128 * 1024;
if (!NaClThreadCreateJoinable(translate_thread_.get(),
DoTranslateThread,
this,
kArbitraryStackSize)) {
TranslateFailed("could not create thread.");
translate_thread_.reset(NULL);
}
}
// Called from main thread to send bytes to the translator.
void PnaclTranslateThread::PutBytes(std::vector<char>* bytes,
int count) {
PLUGIN_PRINTF(("PutBytes, this %p bytes %p, size %d, count %d\n", this, bytes,
bytes ? bytes->size(): 0, count));
size_t buffer_size = 0;
// If we are done (error or not), Signal the translation thread to stop.
if (count <= PP_OK) {
NaClXMutexLock(&cond_mu_);
done_ = true;
NaClXCondVarSignal(&buffer_cond_);
NaClXMutexUnlock(&cond_mu_);
return;
}
CHECK(bytes != NULL);
// Ensure that the buffer we send to the translation thread is the right size
// (count can be < the buffer size). This can be done without the lock.
buffer_size = bytes->size();
bytes->resize(count);
NaClXMutexLock(&cond_mu_);
data_buffers_.push_back(std::vector<char>());
bytes->swap(data_buffers_.back()); // Avoid copying the buffer data.
NaClXCondVarSignal(&buffer_cond_);
NaClXMutexUnlock(&cond_mu_);
// Ensure the buffer we send back to the coordinator is the expected size
bytes->resize(buffer_size);
}
NaClSubprocess* PnaclTranslateThread::StartSubprocess(
......@@ -48,6 +120,99 @@ void WINAPI PnaclTranslateThread::DoTranslateThread(void* arg) {
translator->DoTranslate();
}
void PnaclTranslateThread::DoTranslate() {
ErrorInfo error_info;
nacl::scoped_ptr<NaClSubprocess> llc_subprocess(
StartSubprocess(PnaclUrls::GetLlcUrl(), manifest_, &error_info));
if (llc_subprocess == NULL) {
TranslateFailed("Compile process could not be created: " +
error_info.message());
return;
}
// Run LLC.
SrpcParams params;
nacl::DescWrapper* llc_out_file = obj_file_->get_wrapper();
PluginReverseInterface* llc_reverse =
llc_subprocess->service_runtime()->rev_interface();
llc_reverse->AddTempQuotaManagedFile(obj_file_->identifier());
RegisterReverseInterface(llc_reverse);
if (!llc_subprocess->InvokeSrpcMethod("StreamInit",
"h",
&params,
llc_out_file->desc())) {
// StreamInit returns an error message if the RPC fails.
TranslateFailed(nacl::string("Stream init failed: ") +
nacl::string(params.outs()[0]->arrays.str));
return;
}
PLUGIN_PRINTF(("PnaclCoordinator: StreamInit successful\n"));
// llc process is started.
while(!done_ || data_buffers_.size() > 0) {
NaClXMutexLock(&cond_mu_);
while(!done_ && data_buffers_.size() == 0) {
NaClXCondVarWait(&buffer_cond_, &cond_mu_);
}
PLUGIN_PRINTF(("PnaclTranslateThread awake, done %d, size %d\n",
done_, data_buffers_.size()));
if (data_buffers_.size() > 0) {
std::vector<char> data;
data.swap(data_buffers_.front());
data_buffers_.pop_front();
NaClXMutexUnlock(&cond_mu_);
PLUGIN_PRINTF(("StreamChunk\n"));
if (!llc_subprocess->InvokeSrpcMethod("StreamChunk",
"C",
&params,
&data[0],
data.size())) {
TranslateFailed("Compile stream chunk failed.");
return;
}
PLUGIN_PRINTF(("StreamChunk Successful\n"));
} else {
NaClXMutexUnlock(&cond_mu_);
}
if (SubprocessesShouldDie()) {
TranslateFailed("Stopped by coordinator.");
return;
}
}
PLUGIN_PRINTF(("PnaclTranslateThread done with chunks\n"));
// Finish llc.
if(!llc_subprocess->InvokeSrpcMethod("StreamEnd",
"",
&params)) {
PLUGIN_PRINTF(("PnaclTranslateThread StreamEnd failed\n"));
TranslateFailed(params.outs()[3]->arrays.str);
return;
}
// LLC returns values that are used to determine how linking is done.
int is_shared_library = (params.outs()[0]->u.ival != 0);
nacl::string soname = params.outs()[1]->arrays.str;
nacl::string lib_dependencies = params.outs()[2]->arrays.str;
PLUGIN_PRINTF(("PnaclCoordinator: compile (translator=%p) succeeded"
" is_shared_library=%d, soname='%s', lib_dependencies='%s')\n",
this, is_shared_library, soname.c_str(),
lib_dependencies.c_str()));
// Shut down the llc subprocess.
RegisterReverseInterface(NULL);
llc_subprocess.reset(NULL);
if (SubprocessesShouldDie()) {
TranslateFailed("stopped by coordinator.");
return;
}
if(!RunLdSubprocess(is_shared_library, soname, lib_dependencies)) {
return;
}
pp::Core* core = pp::Module::Get()->core();
core->CallOnMainThread(0, report_translate_finished_, PP_OK);
}
bool PnaclTranslateThread::RunLdSubprocess(int is_shared_library,
const nacl::string& soname,
const nacl::string& lib_dependencies
......@@ -132,15 +297,23 @@ bool PnaclTranslateThread::SubprocessesShouldDie() {
void PnaclTranslateThread::SetSubprocessesShouldDie() {
PLUGIN_PRINTF(("PnaclTranslateThread::SetSubprocessesShouldDie\n"));
nacl::MutexLocker ml(&subprocess_mu_);
NaClXMutexLock(&subprocess_mu_);
subprocesses_should_die_ = true;
if (current_rev_interface_) {
current_rev_interface_->ShutDown();
current_rev_interface_ = NULL;
}
NaClXMutexUnlock(&subprocess_mu_);
nacl::MutexLocker ml(&cond_mu_);
done_ = true;
NaClXCondVarSignal(&buffer_cond_);
}
PnaclTranslateThread::~PnaclTranslateThread() {
PLUGIN_PRINTF(("~PnaclTranslateThread (translate_thread=%p)\n", this));
SetSubprocessesShouldDie();
NaClThreadJoin(translate_thread_.get());
PLUGIN_PRINTF(("~PnaclTranslateThread joined\n"));
NaClMutexDtor(&subprocess_mu_);
}
......
......@@ -5,6 +5,9 @@
#ifndef NATIVE_CLIENT_SRC_TRUSTED_PLUGIN_PNACL_TRANSLATE_THREAD_H_
#define NATIVE_CLIENT_SRC_TRUSTED_PLUGIN_PNACL_TRANSLATE_THREAD_H_
#include <deque>
#include <vector>
#include "native_client/src/include/nacl_macros.h"
#include "native_client/src/include/nacl_scoped_ptr.h"
#include "native_client/src/include/nacl_string.h"
......@@ -32,22 +35,29 @@ class TempFile;
class PnaclTranslateThread {
public:
PnaclTranslateThread();
virtual ~PnaclTranslateThread();
~PnaclTranslateThread();
virtual void RunTranslate(const pp::CompletionCallback& finish_callback,
// Start the translation process. It will continue to run and consume data
// as it is passed in with PutBytes.
void RunTranslate(const pp::CompletionCallback& finish_callback,
const Manifest* manifest,
const Manifest* ld_manifest,
TempFile* obj_file,
LocalTempFile* nexe_file,
ErrorInfo* error_info,
PnaclResources* resources,
Plugin* plugin) = 0;
Plugin* plugin);
// Signal the translate thread and subprocesses that they should stop.
void SetSubprocessesShouldDie();
// Send bitcode bytes to the translator. Called from the main thread.
void PutBytes(std::vector<char>* data, int count);
private:
// Returns true if the translate thread and subprocesses should stop.
bool SubprocessesShouldDie();
// Signal the translate thread and subprocesses that they should stop.
virtual void SetSubprocessesShouldDie();
protected:
// Starts an individual llc or ld subprocess used for translation.
NaClSubprocess* StartSubprocess(const nacl::string& url,
const Manifest* manifest,
......@@ -55,8 +65,8 @@ class PnaclTranslateThread {
// Helper thread entry point for translation. Takes a pointer to
// PnaclTranslateThread and calls DoTranslate().
static void WINAPI DoTranslateThread(void* arg);
// Runs the SRPCs that control translation. Called from the helper thread.
virtual void DoTranslate() = 0;
// Runs the streaming translation. Called from the helper thread.
void DoTranslate() ;
// Signal that Pnacl translation failed, from the translation thread only.
void TranslateFailed(const nacl::string& error_string);
// Run the LD subprocess, returning true on success
......@@ -85,6 +95,22 @@ class PnaclTranslateThread {
// Reverse interface to shutdown on SetSubprocessesShouldDie
PluginReverseInterface* current_rev_interface_;
// Condition variable to synchronize communication with the SRPC thread.
// SRPC thread waits on this condvar if data_buffers_ is empty (meaning
// there is no bitcode to send to the translator), and the main thread
// appends to data_buffers_ and signals it when it receives bitcode.
struct NaClCondVar buffer_cond_;
// Mutex for buffer_cond_.
struct NaClMutex cond_mu_;
// Data buffers from FileDownloader are enqueued here to pass from the
// main thread to the SRPC thread. Protected by cond_mu_
std::deque<std::vector<char> > data_buffers_;
// Whether all data has been downloaded and copied to translation thread.
// Associated with buffer_cond_
bool done_;
// Data about the translation files, owned by the coordinator
const Manifest* manifest_;
const Manifest* ld_manifest_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment