Commit 262060ff authored by jhawkins@chromium.org's avatar jhawkins@chromium.org

ChromeFrame: Convert TaskMarshallerThroughMessageQueue to new Callback system.

BUG=none
TEST=none

R=willchan@chromium.org,stoyan@chromium.org

Review URL: http://codereview.chromium.org/8591009

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@110595 0039d316-1c4b-4281-b951-d872f2087c98
parent c2632ab1
...@@ -378,9 +378,6 @@ class BASE_EXPORT MessageLoop : public base::MessagePump::Delegate { ...@@ -378,9 +378,6 @@ class BASE_EXPORT MessageLoop : public base::MessagePump::Delegate {
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
protected: protected:
// PendingTasks are sorted by their |delayed_run_time| property.
typedef std::priority_queue<base::PendingTask> DelayedTaskQueue;
struct RunState { struct RunState {
// Used to count how many Run() invocations are on the stack. // Used to count how many Run() invocations are on the stack.
int run_depth; int run_depth;
...@@ -491,7 +488,7 @@ class BASE_EXPORT MessageLoop : public base::MessagePump::Delegate { ...@@ -491,7 +488,7 @@ class BASE_EXPORT MessageLoop : public base::MessagePump::Delegate {
base::TaskQueue work_queue_; base::TaskQueue work_queue_;
// Contains delayed tasks, sorted by their 'delayed_run_time' property. // Contains delayed tasks, sorted by their 'delayed_run_time' property.
DelayedTaskQueue delayed_work_queue_; base::DelayedTaskQueue delayed_work_queue_;
// A recent snapshot of Time::Now(), used to check delayed_work_queue_. // A recent snapshot of Time::Now(), used to check delayed_work_queue_.
base::TimeTicks recent_time_; base::TimeTicks recent_time_;
......
...@@ -14,7 +14,7 @@ PendingTask::PendingTask(const tracked_objects::Location& posted_from, ...@@ -14,7 +14,7 @@ PendingTask::PendingTask(const tracked_objects::Location& posted_from,
task(task), task(task),
posted_from(posted_from), posted_from(posted_from),
sequence_num(0), sequence_num(0),
nestable(false) { nestable(true) {
} }
PendingTask::PendingTask(const tracked_objects::Location& posted_from, PendingTask::PendingTask(const tracked_objects::Location& posted_from,
......
...@@ -49,6 +49,9 @@ class TaskQueue : public std::queue<PendingTask> { ...@@ -49,6 +49,9 @@ class TaskQueue : public std::queue<PendingTask> {
void Swap(TaskQueue* queue); void Swap(TaskQueue* queue);
}; };
// PendingTasks are sorted by their |delayed_run_time| property.
typedef std::priority_queue<base::PendingTask> DelayedTaskQueue;
} // namespace base } // namespace base
#endif // PENDING_TASK_H_ #endif // PENDING_TASK_H_
...@@ -131,10 +131,8 @@ PosixDynamicThreadPool::PosixDynamicThreadPool( ...@@ -131,10 +131,8 @@ PosixDynamicThreadPool::PosixDynamicThreadPool(
num_idle_threads_cv_(NULL) {} num_idle_threads_cv_(NULL) {}
PosixDynamicThreadPool::~PosixDynamicThreadPool() { PosixDynamicThreadPool::~PosixDynamicThreadPool() {
while (!pending_tasks_.empty()) { while (!pending_tasks_.empty())
PendingTask pending_task = pending_tasks_.front();
pending_tasks_.pop(); pending_tasks_.pop();
}
} }
void PosixDynamicThreadPool::Terminate() { void PosixDynamicThreadPool::Terminate() {
......
...@@ -172,8 +172,6 @@ class CFProxy : public Interface2IPCMessage, ...@@ -172,8 +172,6 @@ class CFProxy : public Interface2IPCMessage,
bool is_connected_; bool is_connected_;
}; };
DISABLE_RUNNABLE_METHOD_REFCOUNT(CFProxy);
// Support functions. // Support functions.
std::string GenerateChannelId(); std::string GenerateChannelId();
std::wstring BuildCmdLine(const std::string& channel_id, std::wstring BuildCmdLine(const std::string& channel_id,
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include "chrome_frame/cfproxy_private.h" #include "chrome_frame/cfproxy_private.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/tuple.h" #include "base/tuple.h"
#include "ipc/ipc_sync_message.h" #include "ipc/ipc_sync_message.h"
#include "chrome/common/automation_messages.h" #include "chrome/common/automation_messages.h"
...@@ -17,28 +19,31 @@ CFProxy::CFProxy(CFProxyTraits* api) : ipc_thread_("ipc"), ...@@ -17,28 +19,31 @@ CFProxy::CFProxy(CFProxyTraits* api) : ipc_thread_("ipc"),
} }
CFProxy::~CFProxy() { CFProxy::~CFProxy() {
ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, ipc_thread_.message_loop()->PostTask(
&CFProxy::CleanupOnIoThread)); FROM_HERE,
base::Bind(&CFProxy::CleanupOnIoThread, base::Unretained(this)));
// ipc_thread destructor will do the Stop anyway. this is for debug :) // ipc_thread destructor will do the Stop anyway. this is for debug :)
ipc_thread_.Stop(); ipc_thread_.Stop();
} }
void CFProxy::Init(const ProxyParams& params) { void CFProxy::Init(const ProxyParams& params) {
ipc_thread_.StartWithOptions(base::Thread::Options(MessageLoop::TYPE_IO, 0)); ipc_thread_.StartWithOptions(base::Thread::Options(MessageLoop::TYPE_IO, 0));
ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, ipc_thread_.message_loop()->PostTask(
&CFProxy::InitInIoThread, params)); FROM_HERE,
base::Bind(&CFProxy::InitInIoThread, base::Unretained(this), params));
} }
int CFProxy::AddDelegate(ChromeProxyDelegate* delegate) { int CFProxy::AddDelegate(ChromeProxyDelegate* delegate) {
ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, ipc_thread_.message_loop()->PostTask(
&CFProxy::AddDelegateOnIoThread, delegate)); FROM_HERE, base::Bind(&CFProxy::AddDelegateOnIoThread,
base::Unretained(this), delegate));
return ++delegate_count_; return ++delegate_count_;
} }
int CFProxy::RemoveDelegate(ChromeProxyDelegate* delegate) { int CFProxy::RemoveDelegate(ChromeProxyDelegate* delegate) {
ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, ipc_thread_.message_loop()->PostTask(
&CFProxy::RemoveDelegateOnIoThread, delegate)); FROM_HERE, base::Bind(&CFProxy::RemoveDelegateOnIoThread,
base::Unretained(this), delegate));
return --delegate_count_; return --delegate_count_;
} }
...@@ -65,9 +70,8 @@ void CFProxy::InitInIoThread(const ProxyParams& params) { ...@@ -65,9 +70,8 @@ void CFProxy::InitInIoThread(const ProxyParams& params) {
std::wstring cmd_line = BuildCmdLine(channel_id, params.profile_path, std::wstring cmd_line = BuildCmdLine(channel_id, params.profile_path,
params.extra_params); params.extra_params);
if (!cmd_line.empty() && api_->LaunchApp(cmd_line)) { if (!cmd_line.empty() && api_->LaunchApp(cmd_line)) {
CancelableTask* launch_timeout = NewRunnableMethod(this, ipc_thread_.message_loop()->PostDelayedTask(
&CFProxy::LaunchTimeOut); FROM_HERE, base::Bind(&CFProxy::LaunchTimeOut, base::Unretained(this)),
ipc_thread_.message_loop()->PostDelayedTask(FROM_HERE, launch_timeout,
params.timeout.InMilliseconds()); params.timeout.InMilliseconds());
} else { } else {
OnPeerLost(ChromeProxyDelegate::CHROME_EXE_LAUNCH_FAILED); OnPeerLost(ChromeProxyDelegate::CHROME_EXE_LAUNCH_FAILED);
...@@ -110,8 +114,9 @@ void CFProxy::OnPeerLost(ChromeProxyDelegate::DisconnectReason reason) { ...@@ -110,8 +114,9 @@ void CFProxy::OnPeerLost(ChromeProxyDelegate::DisconnectReason reason) {
} }
void CFProxy::SendIpcMessage(IPC::Message* m) { void CFProxy::SendIpcMessage(IPC::Message* m) {
ipc_thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, ipc_thread_.message_loop()->PostTask(
&CFProxy::SendIpcMessageOnIoThread, m)); FROM_HERE, base::Bind(&CFProxy::SendIpcMessageOnIoThread,
base::Unretained(this), m));
} }
void CFProxy::SendIpcMessageOnIoThread(IPC::Message* m) { void CFProxy::SendIpcMessageOnIoThread(IPC::Message* m) {
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h" #include "base/lazy_instance.h"
#include "base/location.h" #include "base/location.h"
#include "chrome_frame/external_tab.h" #include "chrome_frame/external_tab.h"
...@@ -11,9 +13,6 @@ ...@@ -11,9 +13,6 @@
#include "chrome_frame/chrome_frame_delegate.h" #include "chrome_frame/chrome_frame_delegate.h"
#include "chrome_frame/utils.h" #include "chrome_frame/utils.h"
DISABLE_RUNNABLE_METHOD_REFCOUNT(ExternalTabProxy);
DISABLE_RUNNABLE_METHOD_REFCOUNT(UIDelegate);
namespace { namespace {
static base::LazyInstance<ChromeProxyFactory> g_proxy_factory = static base::LazyInstance<ChromeProxyFactory> g_proxy_factory =
LAZY_INSTANCE_INITIALIZER; LAZY_INSTANCE_INITIALIZER;
...@@ -115,8 +114,8 @@ void ExternalTabProxy::CreateTab(const CreateTabParams& create_params, ...@@ -115,8 +114,8 @@ void ExternalTabProxy::CreateTab(const CreateTabParams& create_params,
void ExternalTabProxy::Connected(ChromeProxy* proxy) { void ExternalTabProxy::Connected(ChromeProxy* proxy) {
// in ipc thread // in ipc thread
ui_.PostTask(FROM_HERE, NewRunnableMethod(this, ui_.PostTask(FROM_HERE, base::Bind(&ExternalTabProxy::UiConnected,
&ExternalTabProxy::UiConnected, proxy)); base::Unretained(this), proxy));
} }
void ExternalTabProxy::UiConnected(ChromeProxy* proxy) { void ExternalTabProxy::UiConnected(ChromeProxy* proxy) {
...@@ -147,8 +146,8 @@ void ExternalTabProxy::Disconnected() { ...@@ -147,8 +146,8 @@ void ExternalTabProxy::Disconnected() {
} }
void ExternalTabProxy::PeerLost(ChromeProxy* proxy, DisconnectReason reason) { void ExternalTabProxy::PeerLost(ChromeProxy* proxy, DisconnectReason reason) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(this, &ExternalTabProxy::UiPeerLost, ui_.PostTask(FROM_HERE, base::Bind(&ExternalTabProxy::UiPeerLost,
proxy, reason)); base::Unretained(this), proxy, reason));
} }
void ExternalTabProxy::UiPeerLost(ChromeProxy* proxy, DisconnectReason reason) { void ExternalTabProxy::UiPeerLost(ChromeProxy* proxy, DisconnectReason reason) {
...@@ -231,9 +230,10 @@ void ExternalTabProxy::Completed_CreateTab(bool success, HWND chrome_wnd, ...@@ -231,9 +230,10 @@ void ExternalTabProxy::Completed_CreateTab(bool success, HWND chrome_wnd,
HWND tab_window, int tab_handle, HWND tab_window, int tab_handle,
int session_id) { int session_id) {
// in ipc_thread. // in ipc_thread.
ui_.PostTask(FROM_HERE, NewRunnableMethod(this, ui_.PostTask(
&ExternalTabProxy::UiCompleted_CreateTab, FROM_HERE, base::Bind(&ExternalTabProxy::UiCompleted_CreateTab,
success, chrome_wnd, tab_window, tab_handle, session_id)); base::Unretained(this), success, chrome_wnd,
tab_window, tab_handle, session_id));
} }
void ExternalTabProxy::Completed_ConnectToTab( void ExternalTabProxy::Completed_ConnectToTab(
...@@ -250,60 +250,67 @@ void ExternalTabProxy::Completed_Navigate( ...@@ -250,60 +250,67 @@ void ExternalTabProxy::Completed_Navigate(
void ExternalTabProxy::OnNavigationStateChanged( void ExternalTabProxy::OnNavigationStateChanged(
int flags, const NavigationInfo& nav_info) { int flags, const NavigationInfo& nav_info) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE,
&UIDelegate::OnNavigationStateChanged, flags, nav_info)); base::Bind(&UIDelegate::OnNavigationStateChanged,
base::Unretained(ui_delegate_), flags, nav_info));
} }
void ExternalTabProxy::OnUpdateTargetUrl(const std::wstring& url) { void ExternalTabProxy::OnUpdateTargetUrl(const std::wstring& url) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE, base::Bind(&UIDelegate::OnUpdateTargetUrl,
&UIDelegate::OnUpdateTargetUrl, url)); base::Unretained(ui_delegate_), url));
} }
void ExternalTabProxy::OnTabLoaded(const GURL& url) { void ExternalTabProxy::OnTabLoaded(const GURL& url) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE, base::Bind(&UIDelegate::OnLoad,
&UIDelegate::OnLoad, url)); base::Unretained(ui_delegate_), url));
} }
void ExternalTabProxy::OnMoveWindow(const gfx::Rect& pos) { void ExternalTabProxy::OnMoveWindow(const gfx::Rect& pos) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE, base::Bind(&UIDelegate::OnMoveWindow,
&UIDelegate::OnMoveWindow, pos)); base::Unretained(ui_delegate_), pos));
} }
void ExternalTabProxy::OnMessageToHost(const std::string& message, void ExternalTabProxy::OnMessageToHost(const std::string& message,
const std::string& origin, const std::string& origin,
const std::string& target) { const std::string& target) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(
&UIDelegate::OnMessageFromChromeFrame, message, origin, target)); FROM_HERE,
base::Bind(&UIDelegate::OnMessageFromChromeFrame,
base::Unretained(ui_delegate_), message, origin, target));
} }
void ExternalTabProxy::OnHandleAccelerator(const MSG& accel_message) { void ExternalTabProxy::OnHandleAccelerator(const MSG& accel_message) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE,
&UIDelegate::OnHandleAccelerator, accel_message)); base::Bind(&UIDelegate::OnHandleAccelerator,
base::Unretained(ui_delegate_), accel_message));
} }
void ExternalTabProxy::OnHandleContextMenu( void ExternalTabProxy::OnHandleContextMenu(
const ContextMenuModel& context_menu_model, const ContextMenuModel& context_menu_model,
int align_flags, int align_flags,
const MiniContextMenuParams& params) { const MiniContextMenuParams& params) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE,
&UIDelegate::OnHandleContextMenu, context_menu_model, align_flags, base::Bind(&UIDelegate::OnHandleContextMenu,
params)); base::Unretained(ui_delegate_), context_menu_model,
align_flags, params));
} }
void ExternalTabProxy::OnTabbedOut(bool reverse) { void ExternalTabProxy::OnTabbedOut(bool reverse) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE, base::Bind(&UIDelegate::OnTabbedOut,
&UIDelegate::OnTabbedOut, reverse)); base::Unretained(ui_delegate_), reverse));
} }
void ExternalTabProxy::OnGoToHistoryOffset(int offset) { void ExternalTabProxy::OnGoToHistoryOffset(int offset) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(FROM_HERE, base::Bind(&UIDelegate::OnGoToHistoryOffset,
&UIDelegate::OnGoToHistoryOffset, offset)); base::Unretained(ui_delegate_), offset));
} }
void ExternalTabProxy::OnOpenURL(const GURL& url_to_open, const GURL& referrer, void ExternalTabProxy::OnOpenURL(const GURL& url_to_open, const GURL& referrer,
int open_disposition) { int open_disposition) {
ui_.PostTask(FROM_HERE, NewRunnableMethod(ui_delegate_, ui_.PostTask(
&UIDelegate::OnOpenURL, url_to_open, referrer, open_disposition)); FROM_HERE,
base::Bind(&UIDelegate::OnOpenURL, base::Unretained(ui_delegate_),
url_to_open, referrer, open_disposition));
} }
void ExternalTabProxy::OnNavigationFailed(int error_code, const GURL& gurl) { void ExternalTabProxy::OnNavigationFailed(int error_code, const GURL& gurl) {
......
...@@ -5,17 +5,17 @@ ...@@ -5,17 +5,17 @@
#include "chrome_frame/task_marshaller.h" #include "chrome_frame/task_marshaller.h"
#include "base/task.h" #include "base/task.h"
TaskMarshallerThroughMessageQueue::TaskMarshallerThroughMessageQueue() { TaskMarshallerThroughMessageQueue::TaskMarshallerThroughMessageQueue()
wnd_ = NULL; : wnd_(NULL),
msg_ = 0xFFFF; msg_(0xFFFF) {
} }
TaskMarshallerThroughMessageQueue::~TaskMarshallerThroughMessageQueue() { TaskMarshallerThroughMessageQueue::~TaskMarshallerThroughMessageQueue() {
DeleteAll(); ClearTasks();
} }
void TaskMarshallerThroughMessageQueue::PostTask( void TaskMarshallerThroughMessageQueue::PostTask(
const tracked_objects::Location& from_here, Task* task) { const tracked_objects::Location& from_here, const base::Closure& task) {
DCHECK(wnd_ != NULL); DCHECK(wnd_ != NULL);
lock_.Acquire(); lock_.Acquire();
bool has_work = !pending_tasks_.empty(); bool has_work = !pending_tasks_.empty();
...@@ -28,22 +28,27 @@ void TaskMarshallerThroughMessageQueue::PostTask( ...@@ -28,22 +28,27 @@ void TaskMarshallerThroughMessageQueue::PostTask(
if (!::PostMessage(wnd_, msg_, 0, 0)) { if (!::PostMessage(wnd_, msg_, 0, 0)) {
DVLOG(1) << "Dropping MSG_EXECUTE_TASK message for destroyed window."; DVLOG(1) << "Dropping MSG_EXECUTE_TASK message for destroyed window.";
DeleteAll(); ClearTasks();
} }
} }
void TaskMarshallerThroughMessageQueue::PostDelayedTask( void TaskMarshallerThroughMessageQueue::PostDelayedTask(
const tracked_objects::Location& source, const tracked_objects::Location& source,
Task* task, const base::Closure& task,
base::TimeDelta& delay) { base::TimeDelta& delay) {
DCHECK(wnd_ != NULL); DCHECK(wnd_);
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
DelayedTask delayed_task(task, base::Time::Now() + delay); base::PendingTask delayed_task(source, task, base::TimeTicks::Now() + delay,
true);
base::TimeTicks top_run_time = delayed_tasks_.top().delayed_run_time;
delayed_tasks_.push(delayed_task); delayed_tasks_.push(delayed_task);
// If we become the 'top' task - reschedule the timer.
if (delayed_tasks_.top().task == task) { // Reschedule the timer if |delayed_task| will be the next delayed task to
// run.
if (delayed_task.delayed_run_time < top_run_time) {
::SetTimer(wnd_, reinterpret_cast<UINT_PTR>(this), ::SetTimer(wnd_, reinterpret_cast<UINT_PTR>(this),
static_cast<DWORD>(delay.InMilliseconds()), NULL); static_cast<DWORD>(delay.InMilliseconds()), NULL);
} }
} }
...@@ -68,28 +73,27 @@ BOOL TaskMarshallerThroughMessageQueue::ProcessWindowMessage(HWND hWnd, ...@@ -68,28 +73,27 @@ BOOL TaskMarshallerThroughMessageQueue::ProcessWindowMessage(HWND hWnd,
return FALSE; return FALSE;
} }
Task* TaskMarshallerThroughMessageQueue::PopTask() { base::Closure TaskMarshallerThroughMessageQueue::PopTask() {
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
Task* task = NULL; if (pending_tasks_.empty())
if (!pending_tasks_.empty()) { return base::Closure();
task = pending_tasks_.front();
pending_tasks_.pop(); base::Closure task = pending_tasks_.front();
} pending_tasks_.pop();
return task; return task;
} }
void TaskMarshallerThroughMessageQueue::ExecuteQueuedTasks() { void TaskMarshallerThroughMessageQueue::ExecuteQueuedTasks() {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
Task* task; base::Closure task;
while ((task = PopTask()) != NULL) { while (!(task = PopTask()).is_null())
RunTask(task); task.Run();
}
} }
void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() { void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() {
DCHECK(CalledOnValidThread()); DCHECK(CalledOnValidThread());
::KillTimer(wnd_, reinterpret_cast<UINT_PTR>(this)); ::KillTimer(wnd_, reinterpret_cast<UINT_PTR>(this));
while (1) { while (true) {
lock_.Acquire(); lock_.Acquire();
if (delayed_tasks_.empty()) { if (delayed_tasks_.empty()) {
...@@ -97,13 +101,13 @@ void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() { ...@@ -97,13 +101,13 @@ void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() {
return; return;
} }
base::Time now = base::Time::Now(); base::PendingTask next_task = delayed_tasks_.top();
DelayedTask next_task = delayed_tasks_.top(); base::TimeTicks now = base::TimeTicks::Now();
base::Time next_run = next_task.run_at; base::TimeTicks next_run = next_task.delayed_run_time;
if (next_run > now) { if (next_run > now) {
int64 delay = (next_run - now).InMillisecondsRoundedUp(); int64 delay = (next_run - now).InMillisecondsRoundedUp();
::SetTimer(wnd_, reinterpret_cast<UINT_PTR>(this), ::SetTimer(wnd_, reinterpret_cast<UINT_PTR>(this),
static_cast<DWORD>(delay), NULL); static_cast<DWORD>(delay), NULL);
lock_.Release(); lock_.Release();
return; return;
} }
...@@ -112,46 +116,18 @@ void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() { ...@@ -112,46 +116,18 @@ void TaskMarshallerThroughMessageQueue::ExecuteDelayedTasks() {
lock_.Release(); lock_.Release();
// Run the task outside the lock. // Run the task outside the lock.
RunTask(next_task.task); next_task.task.Run();
} }
} }
void TaskMarshallerThroughMessageQueue::DeleteAll() { void TaskMarshallerThroughMessageQueue::ClearTasks() {
base::AutoLock lock(lock_); base::AutoLock lock(lock_);
DVLOG_IF(1, !pending_tasks_.empty()) << "Destroying " DVLOG_IF(1, !pending_tasks_.empty()) << "Destroying "
<< pending_tasks_.size() << pending_tasks_.size()
<< " pending tasks."; << " pending tasks.";
while (!pending_tasks_.empty()) { while (!pending_tasks_.empty())
Task* task = pending_tasks_.front();
pending_tasks_.pop(); pending_tasks_.pop();
delete task;
}
while (!delayed_tasks_.empty()) { while (!delayed_tasks_.empty())
delete delayed_tasks_.top().task;
delayed_tasks_.pop(); delayed_tasks_.pop();
}
}
void TaskMarshallerThroughMessageQueue::RunTask(Task* task) {
++invoke_task_;
task->Run();
--invoke_task_;
delete task;
}
bool TaskMarshallerThroughMessageQueue::DelayedTask::operator<(
const DelayedTask& other) const {
// Since the top of a priority queue is defined as the "greatest" element, we
// need to invert the comparison here. We want the smaller time to be at the
// top of the heap.
if (run_at < other.run_at)
return false;
if (run_at > other.run_at)
return true;
// If the times happen to match, then we use the sequence number to decide.
// Compare the difference to support integer roll-over.
return (seq - other.seq) > 0;
} }
// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
#include <deque> #include <deque>
#include <queue> #include <queue>
#include "base/callback.h"
#include "base/pending_task.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#include "base/threading/non_thread_safe.h" #include "base/threading/non_thread_safe.h"
#include "base/time.h" #include "base/time.h"
...@@ -26,7 +28,7 @@ namespace tracked_objects { ...@@ -26,7 +28,7 @@ namespace tracked_objects {
class TaskMarshallerThroughMessageQueue : public base::NonThreadSafe { class TaskMarshallerThroughMessageQueue : public base::NonThreadSafe {
public: public:
TaskMarshallerThroughMessageQueue(); TaskMarshallerThroughMessageQueue();
~TaskMarshallerThroughMessageQueue(); virtual ~TaskMarshallerThroughMessageQueue();
void SetWindow(HWND wnd, UINT msg) { void SetWindow(HWND wnd, UINT msg) {
wnd_ = wnd; wnd_ = wnd;
...@@ -34,35 +36,32 @@ class TaskMarshallerThroughMessageQueue : public base::NonThreadSafe { ...@@ -34,35 +36,32 @@ class TaskMarshallerThroughMessageQueue : public base::NonThreadSafe {
} }
virtual void PostTask(const tracked_objects::Location& from_here, virtual void PostTask(const tracked_objects::Location& from_here,
Task* task); const base::Closure& task);
virtual void PostDelayedTask(const tracked_objects::Location& source, virtual void PostDelayedTask(const tracked_objects::Location& source,
Task* task, const base::Closure& task,
base::TimeDelta& delay); base::TimeDelta& delay);
// Called by the owner of the HWND. // Called by the owner of the HWND.
BOOL ProcessWindowMessage(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam, BOOL ProcessWindowMessage(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam,
LRESULT& lResult, DWORD dwMsgMapID = 0); LRESULT& lResult, DWORD dwMsgMapID = 0);
private: private:
void DeleteAll(); void ClearTasks();
inline Task* PopTask(); inline base::Closure PopTask();
inline void ExecuteQueuedTasks(); inline void ExecuteQueuedTasks();
void ExecuteDelayedTasks(); void ExecuteDelayedTasks();
void RunTask(Task* task);
struct DelayedTask { // Shortest delays ordered at the top of the queue.
DelayedTask(Task* task, base::Time at) : run_at(at), task(task), seq(0) {} base::DelayedTaskQueue delayed_tasks_;
base::Time run_at;
Task* task; // A list of tasks that need to be processed by this instance.
int seq; std::queue<base::Closure> pending_tasks_;
// To support sorting based on time in priority_queue.
bool operator<(const DelayedTask& other) const;
};
std::priority_queue<DelayedTask> delayed_tasks_; // Lock accesses to |pending_tasks_|.
std::queue<Task*> pending_tasks_;
base::Lock lock_; base::Lock lock_;
// ::PostMessage parameters.
HWND wnd_; HWND wnd_;
UINT msg_; UINT msg_;
int invoke_task_;
}; };
#endif // CHROME_FRAME_TASK_MARSHALLER_H_ #endif // CHROME_FRAME_TASK_MARSHALLER_H_
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment