Commit 4faacf02 authored by sky@chromium.org's avatar sky@chromium.org

Performance tuning of HandleWatcher

Couple of tweaks:
. make WatcherThreadManager maintain a queue of requests and process
  them at once.
. Makes State not unnecessarily cancel requests if it was told the
  handle is ready.

BUG=none
TEST=none
R=darin@chromium.org

Review URL: https://codereview.chromium.org/480293004

Cr-Commit-Position: refs/heads/master@{#291436}
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@291436 0039d316-1c4b-4281-b951-d872f2087c98
parent 7df9b70a
......@@ -71,9 +71,8 @@ class WatcherBackend : public MessagePumpMojoHandler {
void StartWatching(const WatchData& data);
// Cancels a previously schedule request to start a watch. When done signals
// |event|.
void StopWatching(WatcherID watcher_id, base::WaitableEvent* event);
// Cancels a previously scheduled request to start a watch.
void StopWatching(WatcherID watcher_id);
private:
typedef std::map<Handle, WatchData> HandleToWatchDataMap;
......@@ -112,8 +111,7 @@ void WatcherBackend::StartWatching(const WatchData& data) {
data.deadline);
}
void WatcherBackend::StopWatching(WatcherID watcher_id,
base::WaitableEvent* event) {
void WatcherBackend::StopWatching(WatcherID watcher_id) {
// Because of the thread hop it is entirely possible to get here and not
// have a valid handle registered for |watcher_id|.
Handle handle;
......@@ -121,7 +119,6 @@ void WatcherBackend::StopWatching(WatcherID watcher_id,
handle_to_data_.erase(handle);
message_pump_mojo->RemoveHandler(handle);
}
event->Signal();
}
void WatcherBackend::RemoveAndNotify(const Handle& handle,
......@@ -182,15 +179,47 @@ class WatcherThreadManager {
void StopWatching(WatcherID watcher_id);
private:
enum RequestType {
REQUEST_START,
REQUEST_STOP,
};
// See description of |requests_| for details.
struct RequestData {
RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
RequestType type;
WatchData start_data;
WatcherID stop_id;
base::WaitableEvent* stop_event;
};
typedef std::vector<RequestData> Requests;
friend struct DefaultSingletonTraits<WatcherThreadManager>;
WatcherThreadManager();
// Schedules a request on the background thread. See |requests_| for details.
void AddRequest(const RequestData& data);
// Processes requests added to |requests_|. This is invoked on the backend
// thread.
void ProcessRequestsOnBackendThread();
base::Thread thread_;
base::AtomicSequenceNumber watcher_id_generator_;
WatcherBackend backend_;
// Protects |requests_|.
base::Lock lock_;
// Start/Stop result in adding a RequestData to |requests_| (protected by
// |lock_|). When the background thread wakes up it processes the requests.
Requests requests_;
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
};
......@@ -207,37 +236,77 @@ WatcherID WatcherThreadManager::StartWatching(
MojoHandleSignals handle_signals,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback) {
WatchData data;
data.id = watcher_id_generator_.GetNext();
data.handle = handle;
data.callback = callback;
data.handle_signals = handle_signals;
data.deadline = deadline;
data.message_loop = base::MessageLoopProxy::current();
RequestData request_data;
request_data.type = REQUEST_START;
request_data.start_data.id = watcher_id_generator_.GetNext();
request_data.start_data.handle = handle;
request_data.start_data.callback = callback;
request_data.start_data.handle_signals = handle_signals;
request_data.start_data.deadline = deadline;
request_data.start_data.message_loop = base::MessageLoopProxy::current();
DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
data.message_loop.get());
// We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StartWatching,
base::Unretained(&backend_),
data));
return data.id;
request_data.start_data.message_loop.get());
AddRequest(request_data);
return request_data.start_data.id;
}
void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
// Handle the case of StartWatching() followed by StopWatching() before
// |thread_| woke up.
{
base::AutoLock auto_lock(lock_);
for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
// Watcher ids are not reused, so if we find it we can stop.
requests_.erase(i);
return;
}
}
}
base::ThreadRestrictions::ScopedAllowWait allow_wait;
base::WaitableEvent event(true, false);
RequestData request_data;
request_data.type = REQUEST_STOP;
request_data.stop_id = watcher_id;
request_data.stop_event = &event;
AddRequest(request_data);
// We need to block until the handle is actually removed.
event.Wait();
}
void WatcherThreadManager::AddRequest(const RequestData& data) {
{
base::AutoLock auto_lock(lock_);
const bool was_empty = requests_.empty();
requests_.push_back(data);
if (!was_empty)
return;
}
// We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StopWatching,
base::Unretained(&backend_),
watcher_id,
&event));
base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
base::Unretained(this)));
}
// We need to block until the handle is actually removed.
event.Wait();
void WatcherThreadManager::ProcessRequestsOnBackendThread() {
DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
Requests requests;
{
base::AutoLock auto_lock(lock_);
requests_.swap(requests);
}
for (size_t i = 0; i < requests.size(); ++i) {
if (requests[i].type == REQUEST_START) {
backend_.StartWatching(requests[i].start_data);
} else {
backend_.StopWatching(requests[i].stop_id);
requests[i].stop_event->Signal();
}
}
}
WatcherThreadManager::WatcherThreadManager()
......@@ -261,6 +330,7 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
const base::Callback<void(MojoResult)>& callback)
: watcher_(watcher),
callback_(callback),
got_ready_(false),
weak_factory_(this) {
base::MessageLoop::current()->AddDestructionObserver(this);
......@@ -274,16 +344,27 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
virtual ~State() {
base::MessageLoop::current()->RemoveDestructionObserver(this);
WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
// If we've been notified the handle is ready (|got_ready_| is true) then
// the watch has been implicitly removed by
// WatcherThreadManager/MessagePumpMojo and we don't have to call
// StopWatching(). To do so would needlessly entail posting a task and
// blocking until the background thread services it.
if (!got_ready_)
WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
}
private:
virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
// The current thread is exiting. Simulate a watch error.
OnHandleReady(MOJO_RESULT_ABORTED);
NotifyAndDestroy(MOJO_RESULT_ABORTED);
}
void OnHandleReady(MojoResult result) {
got_ready_ = true;
NotifyAndDestroy(result);
}
void NotifyAndDestroy(MojoResult result) {
base::Callback<void(MojoResult)> callback = callback_;
watcher_->Stop(); // Destroys |this|.
......@@ -294,6 +375,9 @@ class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
WatcherID watcher_id_;
base::Callback<void(MojoResult)> callback_;
// Have we been notified that the handle is ready?
bool got_ready_;
// Used to weakly bind |this| to the WatcherThreadManager.
base::WeakPtrFactory<State> weak_factory_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment