Commit 0c7c18bf authored by oysteine's avatar oysteine Committed by Commit bot

Threaded data provider: Support main thread data notifications (Chrome side)

The threaded data receiver will now receive notifications about received data packets on the main thread (needed for the progress indicator to work properly), and can optionally specify that it wants to receive a full copy of the data (needed when the Inspector is attached).

If a threaded data receiver is attached to a resource request, we also now make sure the resource completed message is bounced via the background thread, to make sure all data is received on the main thread first.

Blink side patch: https://codereview.chromium.org/690793003

BUG=398076

Review URL: https://codereview.chromium.org/689713004

Cr-Commit-Position: refs/heads/master@{#314704}
parent 4ea5490a
...@@ -576,6 +576,17 @@ void ResourceDispatcher::OnRequestComplete( ...@@ -576,6 +576,17 @@ void ResourceDispatcher::OnRequestComplete(
base::TimeTicks renderer_completion_time = ToRendererCompletionTime( base::TimeTicks renderer_completion_time = ToRendererCompletionTime(
*request_info, request_complete_data.completion_time); *request_info, request_complete_data.completion_time);
// If we have a threaded data provider, this message needs to bounce off the
// background thread before it's returned to this thread and handled,
// to make sure it's processed after all incoming data.
if (request_info->threaded_data_provider) {
request_info->threaded_data_provider->OnRequestCompleteForegroundThread(
weak_factory_.GetWeakPtr(), request_complete_data,
renderer_completion_time);
return;
}
// The request ID will be removed from our pending list in the destructor. // The request ID will be removed from our pending list in the destructor.
// Normally, dispatching this message causes the reference-counted request to // Normally, dispatching this message causes the reference-counted request to
// die immediately. // die immediately.
...@@ -587,6 +598,23 @@ void ResourceDispatcher::OnRequestComplete( ...@@ -587,6 +598,23 @@ void ResourceDispatcher::OnRequestComplete(
request_complete_data.encoded_data_length); request_complete_data.encoded_data_length);
} }
void ResourceDispatcher::CompletedRequestAfterBackgroundThreadFlush(
int request_id,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time) {
PendingRequestInfo* request_info = GetPendingRequestInfo(request_id);
if (!request_info)
return;
RequestPeer* peer = request_info->peer;
peer->OnCompletedRequest(request_complete_data.error_code,
request_complete_data.was_ignored_by_handler,
request_complete_data.exists_in_cache,
request_complete_data.security_info,
renderer_completion_time,
request_complete_data.encoded_data_length);
}
int ResourceDispatcher::AddPendingRequest(RequestPeer* callback, int ResourceDispatcher::AddPendingRequest(RequestPeer* callback,
ResourceType resource_type, ResourceType resource_type,
int origin_pid, int origin_pid,
......
...@@ -91,6 +91,14 @@ class CONTENT_EXPORT ResourceDispatcher : public IPC::Listener { ...@@ -91,6 +91,14 @@ class CONTENT_EXPORT ResourceDispatcher : public IPC::Listener {
bool AttachThreadedDataReceiver( bool AttachThreadedDataReceiver(
int request_id, blink::WebThreadedDataReceiver* threaded_data_receiver); int request_id, blink::WebThreadedDataReceiver* threaded_data_receiver);
// If we have a ThreadedDataProvider attached, an OnRequestComplete message
// will get bounced via the background thread and then passed to this function
// to resume processing.
void CompletedRequestAfterBackgroundThreadFlush(
int request_id,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time);
IPC::Sender* message_sender() const { return message_sender_; } IPC::Sender* message_sender() const { return message_sender_; }
// This does not take ownership of the delegate. It is expected that the // This does not take ownership of the delegate. It is expected that the
......
...@@ -157,13 +157,13 @@ ThreadedDataProvider::~ThreadedDataProvider() { ...@@ -157,13 +157,13 @@ ThreadedDataProvider::~ThreadedDataProvider() {
delete threaded_data_receiver_; delete threaded_data_receiver_;
} }
void DestructOnMainThread(ThreadedDataProvider* data_provider) { void ThreadedDataProvider::DestructOnMainThread() {
DCHECK(ChildThreadImpl::current()); DCHECK(ChildThreadImpl::current());
// The ThreadedDataProvider must be destructed on the main thread to // The ThreadedDataProvider must be destructed on the main thread to
// be threadsafe when removing the message filter and releasing the shared // be threadsafe when removing the message filter and releasing the shared
// memory buffer. // memory buffer.
delete data_provider; delete this;
} }
void ThreadedDataProvider::Stop() { void ThreadedDataProvider::Stop() {
...@@ -203,7 +203,35 @@ void ThreadedDataProvider::StopOnBackgroundThread() { ...@@ -203,7 +203,35 @@ void ThreadedDataProvider::StopOnBackgroundThread() {
// use this instance on the background thread. // use this instance on the background thread.
background_thread_weak_factory_.reset(NULL); background_thread_weak_factory_.reset(NULL);
main_thread_task_runner_->PostTask(FROM_HERE, main_thread_task_runner_->PostTask(FROM_HERE,
base::Bind(&DestructOnMainThread, this)); base::Bind(&ThreadedDataProvider::DestructOnMainThread,
base::Unretained(this)));
}
void ThreadedDataProvider::OnRequestCompleteForegroundThread(
base::WeakPtr<ResourceDispatcher> resource_dispatcher,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time) {
DCHECK(ChildThreadImpl::current());
background_thread_.message_loop()->PostTask(FROM_HERE,
base::Bind(&ThreadedDataProvider::OnRequestCompleteBackgroundThread,
base::Unretained(this), resource_dispatcher,
request_complete_data, renderer_completion_time));
}
void ThreadedDataProvider::OnRequestCompleteBackgroundThread(
base::WeakPtr<ResourceDispatcher> resource_dispatcher,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time) {
DCHECK(background_thread_.isCurrentThread());
main_thread_task_runner_->PostTask(FROM_HERE,
base::Bind(
&ResourceDispatcher::CompletedRequestAfterBackgroundThreadFlush,
resource_dispatcher,
request_id_,
request_complete_data,
renderer_completion_time));
} }
void ThreadedDataProvider::OnResourceMessageFilterAddedMainThread() { void ThreadedDataProvider::OnResourceMessageFilterAddedMainThread() {
...@@ -229,7 +257,7 @@ void ThreadedDataProvider::OnResourceMessageFilterAddedBackgroundThread() { ...@@ -229,7 +257,7 @@ void ThreadedDataProvider::OnResourceMessageFilterAddedBackgroundThread() {
if (!queued_data_.empty()) { if (!queued_data_.empty()) {
std::vector<QueuedSharedMemoryData>::iterator iter = queued_data_.begin(); std::vector<QueuedSharedMemoryData>::iterator iter = queued_data_.begin();
for (; iter != queued_data_.end(); ++iter) { for (; iter != queued_data_.end(); ++iter) {
ForwardAndACKData(iter->data, iter->length); ForwardAndACKData(iter->data, iter->length, iter->encoded_length);
} }
queued_data_.clear(); queued_data_.clear();
...@@ -247,7 +275,7 @@ void ThreadedDataProvider::OnReceivedDataOnBackgroundThread( ...@@ -247,7 +275,7 @@ void ThreadedDataProvider::OnReceivedDataOnBackgroundThread(
CHECK(data_ptr + data_offset); CHECK(data_ptr + data_offset);
if (resource_filter_active_) { if (resource_filter_active_) {
ForwardAndACKData(data_ptr + data_offset, data_length); ForwardAndACKData(data_ptr + data_offset, data_length, encoded_data_length);
} else { } else {
// There's a brief interval between the point where we know the filter // There's a brief interval between the point where we know the filter
// has been installed on the I/O thread, and when we know for sure there's // has been installed on the I/O thread, and when we know for sure there's
...@@ -258,6 +286,7 @@ void ThreadedDataProvider::OnReceivedDataOnBackgroundThread( ...@@ -258,6 +286,7 @@ void ThreadedDataProvider::OnReceivedDataOnBackgroundThread(
QueuedSharedMemoryData queued_data; QueuedSharedMemoryData queued_data;
queued_data.data = data_ptr + data_offset; queued_data.data = data_ptr + data_offset;
queued_data.length = data_length; queued_data.length = data_length;
queued_data.encoded_length = encoded_data_length;
queued_data_.push_back(queued_data); queued_data_.push_back(queued_data);
} }
} }
...@@ -269,11 +298,12 @@ void ThreadedDataProvider::OnReceivedDataOnForegroundThread( ...@@ -269,11 +298,12 @@ void ThreadedDataProvider::OnReceivedDataOnForegroundThread(
background_thread_.message_loop()->PostTask(FROM_HERE, background_thread_.message_loop()->PostTask(FROM_HERE,
base::Bind(&ThreadedDataProvider::ForwardAndACKData, base::Bind(&ThreadedDataProvider::ForwardAndACKData,
base::Unretained(this), base::Unretained(this),
data, data_length)); data, data_length, encoded_data_length));
} }
void ThreadedDataProvider::ForwardAndACKData(const char* data, void ThreadedDataProvider::ForwardAndACKData(const char* data,
int data_length) { int data_length,
int encoded_data_length) {
DCHECK(background_thread_.isCurrentThread()); DCHECK(background_thread_.isCurrentThread());
// TODO(oysteine): SiteIsolationPolicy needs to be be checked // TODO(oysteine): SiteIsolationPolicy needs to be be checked
...@@ -281,7 +311,34 @@ void ThreadedDataProvider::ForwardAndACKData(const char* data, ...@@ -281,7 +311,34 @@ void ThreadedDataProvider::ForwardAndACKData(const char* data,
// (or earlier on the I/O thread), otherwise once SiteIsolationPolicy does // (or earlier on the I/O thread), otherwise once SiteIsolationPolicy does
// actual blocking as opposed to just UMA logging this will bypass it. // actual blocking as opposed to just UMA logging this will bypass it.
threaded_data_receiver_->acceptData(data, data_length); threaded_data_receiver_->acceptData(data, data_length);
scoped_ptr<std::vector<char>> data_copy;
if (threaded_data_receiver_->needsMainthreadDataCopy()) {
data_copy.reset(new std::vector<char>(data, data + data_length));
}
main_thread_task_runner_->PostTask(FROM_HERE,
base::Bind(&ThreadedDataProvider::DataNotifyForegroundThread,
base::Unretained(this),
base::Passed(&data_copy),
data_length,
encoded_data_length));
ipc_channel_->Send(new ResourceHostMsg_DataReceived_ACK(request_id_)); ipc_channel_->Send(new ResourceHostMsg_DataReceived_ACK(request_id_));
} }
void ThreadedDataProvider::DataNotifyForegroundThread(
scoped_ptr<std::vector<char> > data_copy,
int data_length,
int encoded_data_length) {
if (data_copy) {
DCHECK(threaded_data_receiver_->needsMainthreadDataCopy());
DCHECK_EQ((size_t)data_length, data_copy->size());
}
threaded_data_receiver_->acceptMainthreadDataNotification(
(data_copy && !data_copy->empty()) ? &data_copy->front() : NULL,
data_length, encoded_data_length);
}
} // namespace content } // namespace content
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include "ipc/ipc_channel.h" #include "ipc/ipc_channel.h"
#include "ipc/message_filter.h" #include "ipc/message_filter.h"
struct ResourceMsg_RequestCompleteData;
namespace blink { namespace blink {
class WebThreadedDataReceiver; class WebThreadedDataReceiver;
} }
...@@ -23,6 +25,7 @@ class SyncChannel; ...@@ -23,6 +25,7 @@ class SyncChannel;
} }
namespace content { namespace content {
class ResourceDispatcher;
class WebThreadImpl; class WebThreadImpl;
class ThreadedDataProvider { class ThreadedDataProvider {
...@@ -33,8 +36,9 @@ class ThreadedDataProvider { ...@@ -33,8 +36,9 @@ class ThreadedDataProvider {
linked_ptr<base::SharedMemory> shm_buffer, linked_ptr<base::SharedMemory> shm_buffer,
int shm_size, int shm_size,
scoped_refptr<base::SingleThreadTaskRunner> main_thread_task_runner_); scoped_refptr<base::SingleThreadTaskRunner> main_thread_task_runner_);
virtual ~ThreadedDataProvider();
// Any destruction of this class has to bounce via the background thread to
// ensure all data is flushed; call Stop() to start this process.
void Stop(); void Stop();
void OnReceivedDataOnBackgroundThread(int data_offset, void OnReceivedDataOnBackgroundThread(int data_offset,
int data_length, int data_length,
...@@ -45,11 +49,28 @@ class ThreadedDataProvider { ...@@ -45,11 +49,28 @@ class ThreadedDataProvider {
int encoded_data_length); int encoded_data_length);
void OnResourceMessageFilterAddedMainThread(); void OnResourceMessageFilterAddedMainThread();
void OnRequestCompleteForegroundThread(
base::WeakPtr<ResourceDispatcher> resource_dispatcher,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time);
private: private:
~ThreadedDataProvider();
void DestructOnMainThread();
void StopOnBackgroundThread(); void StopOnBackgroundThread();
void OnResourceMessageFilterAddedBackgroundThread(); void OnResourceMessageFilterAddedBackgroundThread();
void ForwardAndACKData(const char* data, int data_length); void OnRequestCompleteBackgroundThread(
base::WeakPtr<ResourceDispatcher> resource_dispatcher,
const ResourceMsg_RequestCompleteData& request_complete_data,
const base::TimeTicks& renderer_completion_time);
void ForwardAndACKData(const char* data,
int data_length,
int encoded_data_length);
void DataNotifyForegroundThread(
scoped_ptr<std::vector<char> > data_copy,
int data_length,
int encoded_data_length);
scoped_refptr<IPC::MessageFilter> filter_; scoped_refptr<IPC::MessageFilter> filter_;
int request_id_; int request_id_;
...@@ -66,6 +87,7 @@ class ThreadedDataProvider { ...@@ -66,6 +87,7 @@ class ThreadedDataProvider {
struct QueuedSharedMemoryData { struct QueuedSharedMemoryData {
const char* data; const char* data;
int length; int length;
int encoded_length;
}; };
std::vector<QueuedSharedMemoryData> queued_data_; std::vector<QueuedSharedMemoryData> queued_data_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment