workers: Introduce a WorkerThreadMessageFilter to ease routing/processing worker messages.

Move the task of managing the task-runners for the worker-thread/main-thread in
a common implementation in WorkerThreadMessageFilter, and use that from the rest
of the message filters.

BUG=none
R=jochen@chromium.org, kinuko@chromium.org

Review URL: https://codereview.chromium.org/892543002

Cr-Commit-Position: refs/heads/master@{#314556}
parent 2f1f3565
......@@ -4,40 +4,34 @@
#include "content/child/bluetooth/bluetooth_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/bluetooth/bluetooth_dispatcher.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "ipc/ipc_message_macros.h"
namespace content {
BluetoothMessageFilter::BluetoothMessageFilter(ThreadSafeSender* sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(sender) {
: WorkerThreadMessageFilter(sender) {
}
BluetoothMessageFilter::~BluetoothMessageFilter() {
}
base::TaskRunner* BluetoothMessageFilter::OverrideTaskRunnerForMessage(
bool BluetoothMessageFilter::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == BluetoothMsgStart;
}
void BluetoothMessageFilter::OnFilteredMessageReceived(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != BluetoothMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
BluetoothDispatcher::GetOrCreateThreadSpecificInstance(thread_safe_sender())
->OnMessageReceived(msg);
}
bool BluetoothMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != BluetoothMsgStart)
return false;
BluetoothDispatcher::GetOrCreateThreadSpecificInstance(
thread_safe_sender_.get())->OnMessageReceived(msg);
return true;
bool BluetoothMessageFilter::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
} // namespace content
......@@ -5,30 +5,22 @@
#ifndef CONTENT_CHILD_BLUETOOTH_BLUETOOTH_MESSAGE_FILTER_H_
#define CONTENT_CHILD_BLUETOOTH_BLUETOOTH_MESSAGE_FILTER_H_
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
class BluetoothMessageFilter : public ChildMessageFilter {
class BluetoothMessageFilter : public WorkerThreadMessageFilter {
public:
explicit BluetoothMessageFilter(ThreadSafeSender* thread_safe_sender);
private:
~BluetoothMessageFilter() override;
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
DISALLOW_COPY_AND_ASSIGN(BluetoothMessageFilter);
};
......
......@@ -38,14 +38,13 @@ class WebFrame;
} // namespace blink
namespace content {
class BluetoothMessageFilter;
class ChildMessageFilter;
class ChildDiscardableSharedMemoryManager;
class ChildGpuMemoryBufferManager;
class ChildHistogramMessageFilter;
class ChildResourceMessageFilter;
class ChildSharedBitmapManager;
class FileSystemDispatcher;
class GeofencingMessageFilter;
class NavigatorConnectDispatcher;
class NotificationDispatcher;
class PushDispatcher;
......@@ -294,9 +293,8 @@ class CONTENT_EXPORT ChildThreadImpl
scoped_ptr<base::PowerMonitor> power_monitor_;
scoped_refptr<GeofencingMessageFilter> geofencing_message_filter_;
scoped_refptr<BluetoothMessageFilter> bluetooth_message_filter_;
scoped_refptr<ChildMessageFilter> geofencing_message_filter_;
scoped_refptr<ChildMessageFilter> bluetooth_message_filter_;
scoped_refptr<NavigatorConnectDispatcher> navigator_connect_dispatcher_;
......
......@@ -4,40 +4,33 @@
#include "content/child/geofencing/geofencing_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/geofencing/geofencing_dispatcher.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "ipc/ipc_message_macros.h"
namespace content {
GeofencingMessageFilter::GeofencingMessageFilter(ThreadSafeSender* sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(sender) {
: WorkerThreadMessageFilter(sender) {
}
GeofencingMessageFilter::~GeofencingMessageFilter() {
}
base::TaskRunner* GeofencingMessageFilter::OverrideTaskRunnerForMessage(
bool GeofencingMessageFilter::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == GeofencingMsgStart;
}
void GeofencingMessageFilter::OnFilteredMessageReceived(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != GeofencingMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
GeofencingDispatcher::GetOrCreateThreadSpecificInstance(thread_safe_sender())
->OnMessageReceived(msg);
}
bool GeofencingMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != GeofencingMsgStart)
return false;
GeofencingDispatcher::GetOrCreateThreadSpecificInstance(
thread_safe_sender_.get())->OnMessageReceived(msg);
return true;
bool GeofencingMessageFilter::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
} // namespace content
......@@ -5,30 +5,22 @@
#ifndef CONTENT_CHILD_GEOFENCING_GEOFENCING_MESSAGE_FILTER_H_
#define CONTENT_CHILD_GEOFENCING_GEOFENCING_MESSAGE_FILTER_H_
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
class GeofencingMessageFilter : public ChildMessageFilter {
class GeofencingMessageFilter : public WorkerThreadMessageFilter {
public:
explicit GeofencingMessageFilter(ThreadSafeSender* thread_safe_sender);
private:
~GeofencingMessageFilter() override;
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
DISALLOW_COPY_AND_ASSIGN(GeofencingMessageFilter);
};
......
......@@ -4,10 +4,8 @@
#include "content/child/indexed_db/indexed_db_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/indexed_db/indexed_db_dispatcher.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/indexed_db/indexed_db_constants.h"
#include "content/common/indexed_db/indexed_db_messages.h"
......@@ -15,29 +13,26 @@ namespace content {
IndexedDBMessageFilter::IndexedDBMessageFilter(
ThreadSafeSender* thread_safe_sender)
: main_thread_loop_(base::MessageLoopProxy::current()),
thread_safe_sender_(thread_safe_sender) {}
: WorkerThreadMessageFilter(thread_safe_sender) {
}
IndexedDBMessageFilter::~IndexedDBMessageFilter() {}
base::TaskRunner* IndexedDBMessageFilter::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != IndexedDBMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
bool IndexedDBMessageFilter::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == IndexedDBMsgStart;
}
bool IndexedDBMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != IndexedDBMsgStart)
return false;
IndexedDBDispatcher::ThreadSpecificInstance(thread_safe_sender_.get())
void IndexedDBMessageFilter::OnFilteredMessageReceived(
const IPC::Message& msg) {
IndexedDBDispatcher::ThreadSpecificInstance(thread_safe_sender())
->OnMessageReceived(msg);
return true;
}
bool IndexedDBMessageFilter::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
void IndexedDBMessageFilter::OnStaleMessageReceived(const IPC::Message& msg) {
......@@ -57,13 +52,13 @@ void IndexedDBMessageFilter::OnStaleSuccessIDBDatabase(
const IndexedDBDatabaseMetadata& idb_metadata) {
if (ipc_database_id == kNoDatabase)
return;
thread_safe_sender_->Send(
thread_safe_sender()->Send(
new IndexedDBHostMsg_DatabaseClose(ipc_database_id));
}
void IndexedDBMessageFilter::OnStaleUpgradeNeeded(
const IndexedDBMsg_CallbacksUpgradeNeeded_Params& p) {
thread_safe_sender_->Send(
thread_safe_sender()->Send(
new IndexedDBHostMsg_DatabaseClose(p.ipc_database_id));
}
......
......@@ -6,7 +6,7 @@
#define CONTENT_CHILD_INDEXED_DB_INDEXED_DB_MESSAGE_FILTER_H_
#include "base/memory/ref_counted.h"
#include "content/child/child_message_filter.h"
#include "content/child/worker_thread_message_filter.h"
struct IndexedDBDatabaseMetadata;
struct IndexedDBMsg_CallbacksUpgradeNeeded_Params;
......@@ -15,15 +15,9 @@ namespace base {
class MessageLoopProxy;
}
namespace IPC {
class Message;
}
namespace content {
class ThreadSafeSender;
class IndexedDBMessageFilter : public ChildMessageFilter {
class IndexedDBMessageFilter : public WorkerThreadMessageFilter {
public:
explicit IndexedDBMessageFilter(ThreadSafeSender* thread_safe_sender);
......@@ -31,10 +25,13 @@ class IndexedDBMessageFilter : public ChildMessageFilter {
~IndexedDBMessageFilter() override;
private:
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
// ChildMessageFilter:
void OnStaleMessageReceived(const IPC::Message& msg) override;
void OnStaleSuccessIDBDatabase(int32 ipc_thread_id,
......@@ -44,9 +41,6 @@ class IndexedDBMessageFilter : public ChildMessageFilter {
const IndexedDBDatabaseMetadata&);
void OnStaleUpgradeNeeded(const IndexedDBMsg_CallbacksUpgradeNeeded_Params&);
scoped_refptr<base::MessageLoopProxy> main_thread_loop_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
DISALLOW_COPY_AND_ASSIGN(IndexedDBMessageFilter);
};
......
......@@ -4,41 +4,33 @@
#include "content/child/navigator_connect/navigator_connect_dispatcher.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/navigator_connect/navigator_connect_provider.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/navigator_connect_messages.h"
namespace content {
NavigatorConnectDispatcher::NavigatorConnectDispatcher(ThreadSafeSender* sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(sender) {
: WorkerThreadMessageFilter(sender) {
}
NavigatorConnectDispatcher::~NavigatorConnectDispatcher() {
}
base::TaskRunner* NavigatorConnectDispatcher::OverrideTaskRunnerForMessage(
bool NavigatorConnectDispatcher::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == NavigatorConnectMsgStart;
}
void NavigatorConnectDispatcher::OnFilteredMessageReceived(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != NavigatorConnectMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
NavigatorConnectProvider::ThreadSpecificInstance(
thread_safe_sender(), main_thread_task_runner())->OnMessageReceived(msg);
}
bool NavigatorConnectDispatcher::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != NavigatorConnectMsgStart)
return false;
NavigatorConnectProvider::ThreadSpecificInstance(thread_safe_sender_.get(),
main_thread_loop_proxy_)
->OnMessageReceived(msg);
return true;
bool NavigatorConnectDispatcher::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
} // namespace content
......@@ -5,32 +5,24 @@
#ifndef CONTENT_CHILD_NAVIGATOR_CONNECT_NAVIGATOR_CONNECT_DISPATCHER_H_
#define CONTENT_CHILD_NAVIGATOR_CONNECT_NAVIGATOR_CONNECT_DISPATCHER_H_
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
// Receives IPC messages from the browser process and dispatches them to the
// correct thread specific NavigatorConnectProvider.
class NavigatorConnectDispatcher : public ChildMessageFilter {
class NavigatorConnectDispatcher : public WorkerThreadMessageFilter {
public:
explicit NavigatorConnectDispatcher(ThreadSafeSender* thread_safe_sender);
private:
~NavigatorConnectDispatcher() override;
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
DISALLOW_COPY_AND_ASSIGN(NavigatorConnectDispatcher);
};
......
......@@ -4,19 +4,14 @@
#include "content/child/notifications/notification_dispatcher.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/notifications/notification_manager.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/platform_notification_messages.h"
namespace content {
NotificationDispatcher::NotificationDispatcher(
ThreadSafeSender* thread_safe_sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(thread_safe_sender),
next_notification_id_(0) {
: WorkerThreadMessageFilter(thread_safe_sender), next_notification_id_(0) {
}
NotificationDispatcher::~NotificationDispatcher() {}
......@@ -27,43 +22,32 @@ int NotificationDispatcher::GenerateNotificationId(int thread_id) {
return next_notification_id_++;
}
base::TaskRunner* NotificationDispatcher::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return NULL;
bool NotificationDispatcher::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == PlatformNotificationMsgStart;
}
int notification_id = -1,
thread_id = 0;
void NotificationDispatcher::OnFilteredMessageReceived(
const IPC::Message& msg) {
NotificationManager::ThreadSpecificInstance(thread_safe_sender(),
main_thread_task_runner(),
this)->OnMessageReceived(msg);
}
bool NotificationDispatcher::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
int notification_id = -1;
const bool success = PickleIterator(msg).ReadInt(&notification_id);
DCHECK(success);
{
base::AutoLock lock(notification_id_map_lock_);
auto iterator = notification_id_map_.find(notification_id);
if (iterator != notification_id_map_.end())
thread_id = iterator->second;
base::AutoLock lock(notification_id_map_lock_);
auto iterator = notification_id_map_.find(notification_id);
if (iterator != notification_id_map_.end()) {
*ipc_thread_id = iterator->second;
return true;
}
if (!thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(thread_id);
}
bool NotificationDispatcher::OnMessageReceived(const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return false;
NotificationManager::ThreadSpecificInstance(
thread_safe_sender_.get(),
main_thread_loop_proxy_.get(),
this)->OnMessageReceived(msg);
return true;
}
bool NotificationDispatcher::ShouldHandleMessage(const IPC::Message& msg) {
return IPC_MESSAGE_CLASS(msg) == PlatformNotificationMsgStart;
return false;
}
} // namespace content
......@@ -7,19 +7,12 @@
#include <map>
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
class NotificationDispatcher : public ChildMessageFilter {
class NotificationDispatcher : public WorkerThreadMessageFilter {
public:
explicit NotificationDispatcher(ThreadSafeSender* thread_safe_sender);
......@@ -31,15 +24,11 @@ class NotificationDispatcher : public ChildMessageFilter {
~NotificationDispatcher() override;
private:
bool ShouldHandleMessage(const IPC::Message& msg);
// ChildMessageFilter implementation.
base::TaskRunner* OverrideTaskRunnerForMessage(const IPC::Message& msg)
override;
bool OnMessageReceived(const IPC::Message& msg) override;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
using NotificationIdToThreadId = std::map<int, int>;
......
......@@ -4,19 +4,13 @@
#include "content/child/push_messaging/push_dispatcher.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/pickle.h"
#include "content/child/push_messaging/push_provider.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/push_messaging_messages.h"
namespace content {
PushDispatcher::PushDispatcher(ThreadSafeSender* thread_safe_sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(thread_safe_sender),
next_request_id_(0) {
: WorkerThreadMessageFilter(thread_safe_sender), next_request_id_(0) {
}
PushDispatcher::~PushDispatcher() {
......@@ -28,43 +22,7 @@ int PushDispatcher::GenerateRequestId(int thread_id) {
return next_request_id_++;
}
base::TaskRunner* PushDispatcher::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return nullptr;
int request_id = -1;
int thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&request_id);
DCHECK(success);
{
base::AutoLock lock(request_id_map_lock_);
auto it = request_id_map_.find(request_id);
if (it != request_id_map_.end()) {
thread_id = it->second;
request_id_map_.erase(it);
}
}
if (!thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(thread_id);
}
bool PushDispatcher::OnMessageReceived(const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return false;
bool handled = PushProvider::ThreadSpecificInstance(
thread_safe_sender_.get(), this)->OnMessageReceived(msg);
DCHECK(handled);
return handled;
}
bool PushDispatcher::ShouldHandleMessage(const IPC::Message& msg) {
bool PushDispatcher::ShouldHandleMessage(const IPC::Message& msg) const {
// Note that not all Push API IPC messages flow through this class. A subset
// of the API functionality requires a direct association with a document and
// a frame, and for those cases the IPC messages are handled by a
......@@ -79,4 +37,27 @@ bool PushDispatcher::ShouldHandleMessage(const IPC::Message& msg) {
msg.type() == PushMessagingMsg_UnregisterError::ID;
}
void PushDispatcher::OnFilteredMessageReceived(const IPC::Message& msg) {
bool handled = PushProvider::ThreadSpecificInstance(
thread_safe_sender(), this)->OnMessageReceived(msg);
DCHECK(handled);
}
bool PushDispatcher::GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) {
int request_id = -1;
const bool success = PickleIterator(msg).ReadInt(&request_id);
DCHECK(success);
base::AutoLock lock(request_id_map_lock_);
auto it = request_id_map_.find(request_id);
if (it != request_id_map_.end()) {
*ipc_thread_id = it->second;
request_id_map_.erase(it);
return true;
}
return false;
}
} // namespace content
......@@ -7,20 +7,12 @@
#include <map>
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "content/child/child_message_filter.h"
#include "content/public/common/push_messaging_status.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
class PushDispatcher : public ChildMessageFilter {
class PushDispatcher : public WorkerThreadMessageFilter {
public:
explicit PushDispatcher(ThreadSafeSender* thread_safe_sender);
......@@ -34,15 +26,11 @@ class PushDispatcher : public ChildMessageFilter {
~PushDispatcher() override;
private:
bool ShouldHandleMessage(const IPC::Message& msg);
// ChildMessageFilter implementation.
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
base::Lock request_id_map_lock_;
std::map<int, int> request_id_map_; // Maps request id to thread id.
......
......@@ -11,6 +11,7 @@
#include "base/memory/ref_counted.h"
#include "content/child/push_messaging/push_dispatcher.h"
#include "content/child/worker_task_runner.h"
#include "content/public/common/push_messaging_status.h"
#include "third_party/WebKit/public/platform/WebPushError.h"
#include "third_party/WebKit/public/platform/WebPushProvider.h"
......
......@@ -4,19 +4,13 @@
#include "content/child/quota_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/quota_dispatcher.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/quota_messages.h"
namespace content {
QuotaMessageFilter::QuotaMessageFilter(
ThreadSafeSender* thread_safe_sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(thread_safe_sender),
next_request_id_(0) {
QuotaMessageFilter::QuotaMessageFilter(ThreadSafeSender* thread_safe_sender)
: WorkerThreadMessageFilter(thread_safe_sender), next_request_id_(0) {
}
QuotaMessageFilter::~QuotaMessageFilter() {}
......@@ -38,35 +32,29 @@ void QuotaMessageFilter::ClearThreadRequests(int thread_id) {
}
}
base::TaskRunner* QuotaMessageFilter::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != QuotaMsgStart)
return NULL;
bool QuotaMessageFilter::ShouldHandleMessage(const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == QuotaMsgStart;
}
int request_id = -1, thread_id = 0;
void QuotaMessageFilter::OnFilteredMessageReceived(const IPC::Message& msg) {
QuotaDispatcher::ThreadSpecificInstance(thread_safe_sender(), this)
->OnMessageReceived(msg);
}
bool QuotaMessageFilter::GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) {
int request_id = -1;
const bool success = PickleIterator(msg).ReadInt(&request_id);
DCHECK(success);
{
base::AutoLock lock(request_id_map_lock_);
RequestIdToThreadId::iterator found = request_id_map_.find(request_id);
if (found != request_id_map_.end()) {
thread_id = found->second;
request_id_map_.erase(found);
}
base::AutoLock lock(request_id_map_lock_);
RequestIdToThreadId::iterator found = request_id_map_.find(request_id);
if (found != request_id_map_.end()) {
*ipc_thread_id = found->second;
request_id_map_.erase(found);
return true;
}
if (!thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(thread_id);
}
bool QuotaMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != QuotaMsgStart)
return false;
QuotaDispatcher::ThreadSpecificInstance(thread_safe_sender_.get(), this)
->OnMessageReceived(msg);
return true;
return false;
}
} // namespace content
......@@ -7,19 +7,12 @@
#include <map>
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class ThreadSafeSender;
class QuotaMessageFilter : public ChildMessageFilter {
class QuotaMessageFilter : public WorkerThreadMessageFilter {
public:
explicit QuotaMessageFilter(ThreadSafeSender* thread_safe_sender);
......@@ -35,16 +28,14 @@ class QuotaMessageFilter : public ChildMessageFilter {
~QuotaMessageFilter() override;
private:
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
typedef std::map<int, int> RequestIdToThreadId;
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
base::Lock request_id_map_lock_;
RequestIdToThreadId request_id_map_;
int next_request_id_;
......
......@@ -4,10 +4,8 @@
#include "content/child/service_worker/service_worker_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/service_worker/service_worker_dispatcher.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/common/service_worker/service_worker_messages.h"
#include "content/common/service_worker/service_worker_types.h"
#include "ipc/ipc_message_macros.h"
......@@ -39,29 +37,26 @@ void SendRegistrationObjectDestroyed(
} // namespace
ServiceWorkerMessageFilter::ServiceWorkerMessageFilter(ThreadSafeSender* sender)
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(sender) {}
: WorkerThreadMessageFilter(sender) {
}
ServiceWorkerMessageFilter::~ServiceWorkerMessageFilter() {}
base::TaskRunner* ServiceWorkerMessageFilter::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != ServiceWorkerMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
bool ServiceWorkerMessageFilter::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == ServiceWorkerMsgStart;
}
bool ServiceWorkerMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != ServiceWorkerMsgStart)
return false;
void ServiceWorkerMessageFilter::OnFilteredMessageReceived(
const IPC::Message& msg) {
ServiceWorkerDispatcher::GetOrCreateThreadSpecificInstance(
thread_safe_sender_.get())->OnMessageReceived(msg);
return true;
thread_safe_sender())->OnMessageReceived(msg);
}
bool ServiceWorkerMessageFilter::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
void ServiceWorkerMessageFilter::OnStaleMessageReceived(
......@@ -83,13 +78,13 @@ void ServiceWorkerMessageFilter::OnStaleRegistered(
int request_id,
const ServiceWorkerRegistrationObjectInfo& info,
const ServiceWorkerVersionAttributes& attrs) {
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.installing.handle_id);
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.waiting.handle_id);
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.active.handle_id);
SendRegistrationObjectDestroyed(thread_safe_sender_.get(), info.handle_id);
SendRegistrationObjectDestroyed(thread_safe_sender(), info.handle_id);
}
void ServiceWorkerMessageFilter::OnStaleSetVersionAttributes(
......@@ -98,11 +93,11 @@ void ServiceWorkerMessageFilter::OnStaleSetVersionAttributes(
int registration_handle_id,
int changed_mask,
const ServiceWorkerVersionAttributes& attrs) {
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.installing.handle_id);
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.waiting.handle_id);
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(),
SendServiceWorkerObjectDestroyed(thread_safe_sender(),
attrs.active.handle_id);
// Don't have to decrement registration refcount because the sender of the
// SetVersionAttributes message doesn't increment it.
......@@ -113,7 +108,7 @@ void ServiceWorkerMessageFilter::OnStaleSetControllerServiceWorker(
int provider_id,
const ServiceWorkerObjectInfo& info,
bool should_notify_controllerchange) {
SendServiceWorkerObjectDestroyed(thread_safe_sender_.get(), info.handle_id);
SendServiceWorkerObjectDestroyed(thread_safe_sender(), info.handle_id);
}
} // namespace content
......@@ -5,22 +5,17 @@
#ifndef CONTENT_CHILD_SERVICE_WORKER_SERVICE_WORKER_MESSAGE_FILTER_H_
#define CONTENT_CHILD_SERVICE_WORKER_SERVICE_WORKER_MESSAGE_FILTER_H_
#include "content/child/child_message_filter.h"
#include "content/child/worker_thread_message_filter.h"
#include "content/common/content_export.h"
namespace base {
class MessageLoopProxy;
}
namespace content {
class ThreadSafeSender;
struct ServiceWorkerObjectInfo;
struct ServiceWorkerRegistrationObjectInfo;
struct ServiceWorkerVersionAttributes;
class CONTENT_EXPORT ServiceWorkerMessageFilter
: public NON_EXPORTED_BASE(ChildMessageFilter) {
: public NON_EXPORTED_BASE(WorkerThreadMessageFilter) {
public:
explicit ServiceWorkerMessageFilter(ThreadSafeSender* thread_safe_sender);
......@@ -28,10 +23,13 @@ class CONTENT_EXPORT ServiceWorkerMessageFilter
~ServiceWorkerMessageFilter() override;
private:
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
// ChildMessageFilter:
void OnStaleMessageReceived(const IPC::Message& msg) override;
// Message handlers for stale messages.
......@@ -52,9 +50,6 @@ class CONTENT_EXPORT ServiceWorkerMessageFilter
const ServiceWorkerObjectInfo& info,
bool should_notify_controllerchange);
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
DISALLOW_COPY_AND_ASSIGN(ServiceWorkerMessageFilter);
};
......
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "content/child/worker_thread_message_filter.h"
#include "base/thread_task_runner_handle.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "ipc/ipc_message_macros.h"
namespace content {
WorkerThreadMessageFilter::WorkerThreadMessageFilter(
ThreadSafeSender* thread_safe_sender)
: main_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()),
thread_safe_sender_(thread_safe_sender) {
}
WorkerThreadMessageFilter::~WorkerThreadMessageFilter() {
}
base::TaskRunner* WorkerThreadMessageFilter::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return nullptr;
int ipc_thread_id = 0;
const bool success = GetWorkerThreadIdForMessage(msg, &ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_task_runner_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
}
bool WorkerThreadMessageFilter::OnMessageReceived(const IPC::Message& msg) {
if (!ShouldHandleMessage(msg))
return false;
OnFilteredMessageReceived(msg);
return true;
}
} // namespace content
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CONTENT_CHILD_WORKER_THREAD_MESSAGE_FILTER_H_
#define CONTENT_CHILD_WORKER_THREAD_MESSAGE_FILTER_H_
#include "content/child/child_message_filter.h"
namespace base {
class SingleThreadTaskRunner;
}
namespace content {
class ThreadSafeSender;
// A base class for filtering IPC messages targeted for worker threads.
class WorkerThreadMessageFilter : public ChildMessageFilter {
public:
explicit WorkerThreadMessageFilter(ThreadSafeSender* thread_safe_sender);
protected:
~WorkerThreadMessageFilter() override;
base::SingleThreadTaskRunner* main_thread_task_runner() {
return main_thread_task_runner_.get();
}
ThreadSafeSender* thread_safe_sender() { return thread_safe_sender_.get(); }
private:
// Returns whether this filter should process |msg|.
virtual bool ShouldHandleMessage(const IPC::Message& msg) const = 0;
// Processes the IPC message in the worker thread, if the filter could extract
// its thread id. Otherwise, runs in the main thread. It only receives a
// message if ShouldHandleMessage() returns true for it.
virtual void OnFilteredMessageReceived(const IPC::Message& msg) = 0;
// Attempts to extract the thread-id of the worker-thread that should process
// the IPC message. Returns whether the thread-id could be determined and set
// in |ipc_thread_id|.
virtual bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) = 0;
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(const IPC::Message& msg) final;
bool OnMessageReceived(const IPC::Message& msg) final;
scoped_refptr<base::SingleThreadTaskRunner> main_thread_task_runner_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
DISALLOW_COPY_AND_ASSIGN(WorkerThreadMessageFilter);
};
} // namespace content
#endif // CONTENT_CHILD_WORKER_THREAD_MESSAGE_FILTER_H_
......@@ -273,6 +273,8 @@
'child/weburlresponse_extradata_impl.h',
'child/worker_task_runner.cc',
'child/worker_task_runner.h',
'child/worker_thread_message_filter.cc',
'child/worker_thread_message_filter.h',
'child/worker_thread_task_runner.cc',
'child/worker_thread_task_runner.h',
],
......
......@@ -4,45 +4,38 @@
#include "content/renderer/service_worker/embedded_worker_context_message_filter.h"
#include "base/message_loop/message_loop_proxy.h"
#include "content/child/child_thread_impl.h"
#include "content/child/thread_safe_sender.h"
#include "content/child/worker_thread_task_runner.h"
#include "content/renderer/service_worker/embedded_worker_context_client.h"
#include "ipc/ipc_message_macros.h"
namespace content {
EmbeddedWorkerContextMessageFilter::EmbeddedWorkerContextMessageFilter()
: main_thread_loop_proxy_(base::MessageLoopProxy::current()),
thread_safe_sender_(ChildThreadImpl::current()->thread_safe_sender()) {}
: WorkerThreadMessageFilter(
ChildThreadImpl::current()->thread_safe_sender()) {
}
EmbeddedWorkerContextMessageFilter::~EmbeddedWorkerContextMessageFilter() {}
base::TaskRunner*
EmbeddedWorkerContextMessageFilter::OverrideTaskRunnerForMessage(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != EmbeddedWorkerContextMsgStart)
return NULL;
int ipc_thread_id = 0;
const bool success = PickleIterator(msg).ReadInt(&ipc_thread_id);
DCHECK(success);
if (!ipc_thread_id)
return main_thread_loop_proxy_.get();
return new WorkerThreadTaskRunner(ipc_thread_id);
bool EmbeddedWorkerContextMessageFilter::ShouldHandleMessage(
const IPC::Message& msg) const {
return IPC_MESSAGE_CLASS(msg) == EmbeddedWorkerContextMsgStart;
}
bool EmbeddedWorkerContextMessageFilter::OnMessageReceived(
void EmbeddedWorkerContextMessageFilter::OnFilteredMessageReceived(
const IPC::Message& msg) {
if (IPC_MESSAGE_CLASS(msg) != EmbeddedWorkerContextMsgStart)
return false;
EmbeddedWorkerContextClient* client =
EmbeddedWorkerContextClient::ThreadSpecificInstance();
if (!client) {
if (!client)
LOG(ERROR) << "Stray message is sent to nonexistent worker";
return true;
}
return client->OnMessageReceived(msg);
else
client->OnMessageReceived(msg);
}
bool EmbeddedWorkerContextMessageFilter::GetWorkerThreadIdForMessage(
const IPC::Message& msg,
int* ipc_thread_id) {
return PickleIterator(msg).ReadInt(ipc_thread_id);
}
} // namespace content
......@@ -5,30 +5,24 @@
#ifndef CONTENT_RENDERER_SERVICE_WORKER_EMBEDDED_WORKER_CONTEXT_MESSAGE_FILTER_H_
#define CONTENT_RENDERER_SERVICE_WORKER_EMBEDDED_WORKER_CONTEXT_MESSAGE_FILTER_H_
#include "content/child/child_message_filter.h"
namespace base {
class MessageLoopProxy;
}
#include "content/child/worker_thread_message_filter.h"
namespace content {
class EmbeddedWorkerContextMessageFilter : public ChildMessageFilter {
class EmbeddedWorkerContextMessageFilter : public WorkerThreadMessageFilter {
public:
EmbeddedWorkerContextMessageFilter();
protected:
~EmbeddedWorkerContextMessageFilter() override;
// ChildMessageFilter implementation:
base::TaskRunner* OverrideTaskRunnerForMessage(
const IPC::Message& msg) override;
bool OnMessageReceived(const IPC::Message& msg) override;
// WorkerThreadMessageFilter:
bool ShouldHandleMessage(const IPC::Message& msg) const override;
void OnFilteredMessageReceived(const IPC::Message& msg) override;
bool GetWorkerThreadIdForMessage(const IPC::Message& msg,
int* ipc_thread_id) override;
private:
scoped_refptr<base::MessageLoopProxy> main_thread_loop_proxy_;
scoped_refptr<ThreadSafeSender> thread_safe_sender_;
DISALLOW_COPY_AND_ASSIGN(EmbeddedWorkerContextMessageFilter);
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment