Commit 4b71a721 authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Update JingleThreadWrapper to allow it to be created using task runner.

Previously JingleThreadWrapper required a MessageLoop. In NaCl
we don't have MessageLoop for the main plugin thread, but have a task runner.
This CL allows to use JingleThreadWrapper on the main NaCl thread.

BUG=134216
TBR=tommi@chromium.org

Review URL: https://chromiumcodereview.appspot.com/10823224

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@151458 0039d316-1c4b-4281-b951-d872f2087c98
parent 2a8d287c
...@@ -370,7 +370,7 @@ void MediaStreamImpl::OnSocketDispatcherDestroyed() { ...@@ -370,7 +370,7 @@ void MediaStreamImpl::OnSocketDispatcherDestroyed() {
void MediaStreamImpl::InitializeWorkerThread(talk_base::Thread** thread, void MediaStreamImpl::InitializeWorkerThread(talk_base::Thread** thread,
base::WaitableEvent* event) { base::WaitableEvent* event) {
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
jingle_glue::JingleThreadWrapper::current()->set_send_allowed(true); jingle_glue::JingleThreadWrapper::current()->set_send_allowed(true);
*thread = jingle_glue::JingleThreadWrapper::current(); *thread = jingle_glue::JingleThreadWrapper::current();
event->Signal(); event->Signal();
...@@ -392,7 +392,7 @@ void MediaStreamImpl::DeleteIpcNetworkManager() { ...@@ -392,7 +392,7 @@ void MediaStreamImpl::DeleteIpcNetworkManager() {
bool MediaStreamImpl::EnsurePeerConnectionFactory() { bool MediaStreamImpl::EnsurePeerConnectionFactory() {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
if (!signaling_thread_) { if (!signaling_thread_) {
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
jingle_glue::JingleThreadWrapper::current()->set_send_allowed(true); jingle_glue::JingleThreadWrapper::current()->set_send_allowed(true);
signaling_thread_ = jingle_glue::JingleThreadWrapper::current(); signaling_thread_ = jingle_glue::JingleThreadWrapper::current();
} }
......
...@@ -58,7 +58,7 @@ bool P2PTransportImpl::Init(WebKit::WebFrame* web_frame, ...@@ -58,7 +58,7 @@ bool P2PTransportImpl::Init(WebKit::WebFrame* web_frame,
// Before proceeding, ensure we have libjingle thread wrapper for // Before proceeding, ensure we have libjingle thread wrapper for
// the current thread. // the current thread.
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
name_ = name; name_ = name;
event_handler_ = event_handler; event_handler_ = event_handler;
......
...@@ -303,7 +303,7 @@ class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> { ...@@ -303,7 +303,7 @@ class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> {
class PseudoTcpAdapterTest : public testing::Test { class PseudoTcpAdapterTest : public testing::Test {
protected: protected:
virtual void SetUp() OVERRIDE { virtual void SetUp() OVERRIDE {
JingleThreadWrapper::EnsureForCurrentThread(); JingleThreadWrapper::EnsureForCurrentMessageLoop();
host_socket_ = new FakeSocket(); host_socket_ = new FakeSocket();
client_socket_ = new FakeSocket(); client_socket_ = new FakeSocket();
......
...@@ -29,10 +29,12 @@ base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > ...@@ -29,10 +29,12 @@ base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
// static // static
void JingleThreadWrapper::EnsureForCurrentThread() { void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
if (JingleThreadWrapper::current() == NULL) { if (JingleThreadWrapper::current() == NULL) {
g_jingle_thread_wrapper.Get().Set( MessageLoop* message_loop = MessageLoop::current();
new JingleThreadWrapper(MessageLoop::current())); g_jingle_thread_wrapper.Get().Set(new JingleThreadWrapper(
message_loop->message_loop_proxy()));
message_loop->AddDestructionObserver(current());
} }
DCHECK_EQ(talk_base::Thread::Current(), current()); DCHECK_EQ(talk_base::Thread::Current(), current());
...@@ -43,19 +45,18 @@ JingleThreadWrapper* JingleThreadWrapper::current() { ...@@ -43,19 +45,18 @@ JingleThreadWrapper* JingleThreadWrapper::current() {
return g_jingle_thread_wrapper.Get().Get(); return g_jingle_thread_wrapper.Get().Get();
} }
JingleThreadWrapper::JingleThreadWrapper(MessageLoop* message_loop) JingleThreadWrapper::JingleThreadWrapper(
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: talk_base::Thread(new talk_base::NullSocketServer()), : talk_base::Thread(new talk_base::NullSocketServer()),
message_loop_(message_loop), task_runner_(task_runner),
send_allowed_(false), send_allowed_(false),
last_task_id_(0), last_task_id_(0),
pending_send_event_(true, false) { pending_send_event_(true, false),
DCHECK_EQ(message_loop_, MessageLoop::current()); weak_ptr_factory_(this),
weak_ptr_(weak_ptr_factory_.GetWeakPtr()) {
talk_base::ThreadManager::Instance()->UnwrapCurrentThread(); DCHECK(task_runner->BelongsToCurrentThread());
talk_base::ThreadManager::Instance()->SetCurrentThread(this); DCHECK(!talk_base::Thread::Current());
talk_base::MessageQueueManager::Instance()->Add(this); talk_base::MessageQueueManager::Instance()->Add(this);
message_loop_->AddDestructionObserver(this);
WrapCurrent(); WrapCurrent();
} }
...@@ -69,7 +70,6 @@ void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { ...@@ -69,7 +70,6 @@ void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
g_jingle_thread_wrapper.Get().Set(NULL); g_jingle_thread_wrapper.Get().Set(NULL);
talk_base::ThreadManager::Instance()->SetCurrentThread(NULL); talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
talk_base::MessageQueueManager::Instance()->Remove(this); talk_base::MessageQueueManager::Instance()->Remove(this);
message_loop_->RemoveDestructionObserver(this);
talk_base::SocketServer* ss = socketserver(); talk_base::SocketServer* ss = socketserver();
delete this; delete this;
delete ss; delete ss;
...@@ -162,9 +162,9 @@ void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id, ...@@ -162,9 +162,9 @@ void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id,
// Need to signal |pending_send_event_| here in case the thread is // Need to signal |pending_send_event_| here in case the thread is
// sending message to another thread. // sending message to another thread.
pending_send_event_.Signal(); pending_send_event_.Signal();
message_loop_->PostTask(FROM_HERE, task_runner_->PostTask(FROM_HERE,
base::Bind(&JingleThreadWrapper::ProcessPendingSends, base::Bind(&JingleThreadWrapper::ProcessPendingSends,
base::Unretained(this))); weak_ptr_));
while (!pending_send.done_event.IsSignaled()) { while (!pending_send.done_event.IsSignaled()) {
...@@ -214,14 +214,14 @@ void JingleThreadWrapper::PostTaskInternal( ...@@ -214,14 +214,14 @@ void JingleThreadWrapper::PostTaskInternal(
} }
if (delay_ms <= 0) { if (delay_ms <= 0) {
message_loop_->PostTask(FROM_HERE, task_runner_->PostTask(FROM_HERE,
base::Bind(&JingleThreadWrapper::RunTask, base::Bind(&JingleThreadWrapper::RunTask,
base::Unretained(this), task_id)); weak_ptr_, task_id));
} else { } else {
message_loop_->PostDelayedTask(FROM_HERE, task_runner_->PostDelayedTask(FROM_HERE,
base::Bind(&JingleThreadWrapper::RunTask, base::Bind(&JingleThreadWrapper::RunTask,
base::Unretained(this), task_id), weak_ptr_, task_id),
base::TimeDelta::FromMilliseconds(delay_ms)); base::TimeDelta::FromMilliseconds(delay_ms));
} }
} }
......
...@@ -16,25 +16,34 @@ ...@@ -16,25 +16,34 @@
namespace jingle_glue { namespace jingle_glue {
// JingleThreadWrapper wraps Chromium threads using talk_base::Thread // JingleThreadWrapper implements talk_base::Thread interface on top of
// interface. The object must be created by calling // Chromium's SingleThreadTaskRunner interface. Currently only the bare minimum
// EnsureForCurrentThread(). Each JingleThreadWrapper deletes itself // that is used by P2P part of libjingle is implemented. There are two ways to
// when MessageLoop is destroyed. Currently only the bare minimum that // create this object:
// is used by P2P part of libjingle is implemented. //
// - Call EnsureForCurrentMessageLoop(). This approach works only on threads
// that have MessageLoop In this case JingleThreadWrapper deletes itself
// automatically when MessageLoop is destroyed.
// - Using JingleThreadWrapper() constructor. In this case the creating code
// must pass a valid task runner for the current thread and also delete the
// wrapper later.
class JingleThreadWrapper class JingleThreadWrapper
: public MessageLoop::DestructionObserver, : public MessageLoop::DestructionObserver,
public talk_base::Thread { public talk_base::Thread {
public: public:
// Create JingleThreadWrapper for the current thread if it hasn't // Create JingleThreadWrapper for the current thread if it hasn't been created
// been created yet. // yet. The thread wrapper is destroyed automatically when the current
static void EnsureForCurrentThread(); // MessageLoop is destroyed.
static void EnsureForCurrentMessageLoop();
// Returns thread wrapper for the current thread. NULL is returned // Returns thread wrapper for the current thread. NULL is returned
// if EnsureForCurrentThread() has never been called for this // if EnsureForCurrentMessageLoop() has never been called for this
// thread. // thread.
static JingleThreadWrapper* current(); static JingleThreadWrapper* current();
JingleThreadWrapper(MessageLoop* message_loop); explicit JingleThreadWrapper(
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
virtual ~JingleThreadWrapper();
// Sets whether the thread can be used to send messages // Sets whether the thread can be used to send messages
// synchronously to another thread using Send() method. Set to false // synchronously to another thread using Send() method. Set to false
...@@ -90,16 +99,14 @@ class JingleThreadWrapper ...@@ -90,16 +99,14 @@ class JingleThreadWrapper
typedef std::map<int, talk_base::Message> MessagesQueue; typedef std::map<int, talk_base::Message> MessagesQueue;
struct PendingSend; struct PendingSend;
virtual ~JingleThreadWrapper();
void PostTaskInternal( void PostTaskInternal(
int delay_ms, talk_base::MessageHandler* handler, int delay_ms, talk_base::MessageHandler* handler,
uint32 message_id, talk_base::MessageData* data); uint32 message_id, talk_base::MessageData* data);
void RunTask(int task_id); void RunTask(int task_id);
void ProcessPendingSends(); void ProcessPendingSends();
// Chromium thread used to execute messages posted on this thread. // Task runner used to execute messages posted on this thread.
MessageLoop* message_loop_; scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
bool send_allowed_; bool send_allowed_;
...@@ -109,8 +116,13 @@ class JingleThreadWrapper ...@@ -109,8 +116,13 @@ class JingleThreadWrapper
MessagesQueue messages_; MessagesQueue messages_;
std::list<PendingSend*> pending_send_messages_; std::list<PendingSend*> pending_send_messages_;
base::WaitableEvent pending_send_event_; base::WaitableEvent pending_send_event_;
base::WeakPtrFactory<JingleThreadWrapper> weak_ptr_factory_;
base::WeakPtr<JingleThreadWrapper> weak_ptr_;
DISALLOW_COPY_AND_ASSIGN(JingleThreadWrapper);
}; };
} } // namespace jingle_glue
#endif // JINGLE_GLUE_THREAD_WRAPPER_H_ #endif // JINGLE_GLUE_THREAD_WRAPPER_H_
...@@ -81,7 +81,7 @@ class ThreadWrapperTest : public testing::Test { ...@@ -81,7 +81,7 @@ class ThreadWrapperTest : public testing::Test {
} }
virtual void SetUp() OVERRIDE { virtual void SetUp() OVERRIDE {
JingleThreadWrapper::EnsureForCurrentThread(); JingleThreadWrapper::EnsureForCurrentMessageLoop();
thread_ = talk_base::Thread::Current(); thread_ = talk_base::Thread::Current();
} }
...@@ -230,7 +230,7 @@ TEST_F(ThreadWrapperTest, SendSameThread) { ...@@ -230,7 +230,7 @@ TEST_F(ThreadWrapperTest, SendSameThread) {
void InitializeWrapperForNewThread(talk_base::Thread** thread, void InitializeWrapperForNewThread(talk_base::Thread** thread,
base::WaitableEvent* done_event) { base::WaitableEvent* done_event) {
JingleThreadWrapper::EnsureForCurrentThread(); JingleThreadWrapper::EnsureForCurrentMessageLoop();
JingleThreadWrapper::current()->set_send_allowed(true); JingleThreadWrapper::current()->set_send_allowed(true);
*thread = JingleThreadWrapper::current(); *thread = JingleThreadWrapper::current();
done_event->Signal(); done_event->Signal();
......
...@@ -467,7 +467,7 @@ void ChromotingInstance::OnFirstFrameReceived() { ...@@ -467,7 +467,7 @@ void ChromotingInstance::OnFirstFrameReceived() {
void ChromotingInstance::Connect(const ClientConfig& config) { void ChromotingInstance::Connect(const ClientConfig& config) {
DCHECK(plugin_task_runner_->BelongsToCurrentThread()); DCHECK(plugin_task_runner_->BelongsToCurrentThread());
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
host_connection_.reset(new protocol::ConnectionToHost(true)); host_connection_.reset(new protocol::ConnectionToHost(true));
scoped_ptr<AudioPlayer> audio_player(new PepperAudioPlayer(this)); scoped_ptr<AudioPlayer> audio_player(new PepperAudioPlayer(this));
......
...@@ -312,7 +312,7 @@ LibjingleTransportFactory::LibjingleTransportFactory( ...@@ -312,7 +312,7 @@ LibjingleTransportFactory::LibjingleTransportFactory(
: http_port_allocator_(port_allocator.get()), : http_port_allocator_(port_allocator.get()),
port_allocator_(port_allocator.Pass()), port_allocator_(port_allocator.Pass()),
incoming_only_(incoming_only) { incoming_only_(incoming_only) {
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
} }
LibjingleTransportFactory::LibjingleTransportFactory() LibjingleTransportFactory::LibjingleTransportFactory()
...@@ -322,7 +322,7 @@ LibjingleTransportFactory::LibjingleTransportFactory() ...@@ -322,7 +322,7 @@ LibjingleTransportFactory::LibjingleTransportFactory()
port_allocator_(new cricket::BasicPortAllocator( port_allocator_(new cricket::BasicPortAllocator(
network_manager_.get(), socket_factory_.get())), network_manager_.get(), socket_factory_.get())),
incoming_only_(false) { incoming_only_(false) {
jingle_glue::JingleThreadWrapper::EnsureForCurrentThread(); jingle_glue::JingleThreadWrapper::EnsureForCurrentMessageLoop();
port_allocator_->set_flags( port_allocator_->set_flags(
cricket::PORTALLOCATOR_DISABLE_TCP | cricket::PORTALLOCATOR_DISABLE_TCP |
cricket::PORTALLOCATOR_DISABLE_STUN | cricket::PORTALLOCATOR_DISABLE_STUN |
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment