Commit bb8a31f2 authored by Misha Efimov's avatar Misha Efimov Committed by Commit Bot

[Cronet] Retain net::IOBuffer when posting UploadDataSink::Read to executor.

Bug: 1010286
Change-Id: I66aa3617ee5e16c751a47d7f36f91925d89107d7
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1835160Reviewed-by: default avatarZhongyi Shi <zhongyi@chromium.org>
Reviewed-by: default avatarPaul Jensen <pauljensen@chromium.org>
Commit-Queue: Paul Jensen <pauljensen@chromium.org>
Auto-Submit: Misha Efimov <mef@chromium.org>
Cr-Commit-Position: refs/heads/master@{#745182}
parent 796f31c9
...@@ -41,7 +41,8 @@ void CronetUploadDataStreamAdapter::InitializeOnNetworkThread( ...@@ -41,7 +41,8 @@ void CronetUploadDataStreamAdapter::InitializeOnNetworkThread(
DCHECK(network_task_runner_); DCHECK(network_task_runner_);
} }
void CronetUploadDataStreamAdapter::Read(net::IOBuffer* buffer, int buf_len) { void CronetUploadDataStreamAdapter::Read(scoped_refptr<net::IOBuffer> buffer,
int buf_len) {
DCHECK(upload_data_stream_); DCHECK(upload_data_stream_);
DCHECK(network_task_runner_); DCHECK(network_task_runner_);
DCHECK(network_task_runner_->BelongsToCurrentThread()); DCHECK(network_task_runner_->BelongsToCurrentThread());
...@@ -52,7 +53,8 @@ void CronetUploadDataStreamAdapter::Read(net::IOBuffer* buffer, int buf_len) { ...@@ -52,7 +53,8 @@ void CronetUploadDataStreamAdapter::Read(net::IOBuffer* buffer, int buf_len) {
// ones used last time. // ones used last time.
if (!(buffer_ && buffer_->io_buffer()->data() == buffer->data() && if (!(buffer_ && buffer_->io_buffer()->data() == buffer->data() &&
buffer_->io_buffer_len() == buf_len)) { buffer_->io_buffer_len() == buf_len)) {
buffer_ = std::make_unique<ByteBufferWithIOBuffer>(env, buffer, buf_len); buffer_ = std::make_unique<ByteBufferWithIOBuffer>(env, std::move(buffer),
buf_len);
} }
Java_CronetUploadDataStream_readData(env, jupload_data_stream_, Java_CronetUploadDataStream_readData(env, jupload_data_stream_,
buffer_->byte_buffer()); buffer_->byte_buffer());
......
...@@ -41,7 +41,7 @@ class CronetUploadDataStreamAdapter : public CronetUploadDataStream::Delegate { ...@@ -41,7 +41,7 @@ class CronetUploadDataStreamAdapter : public CronetUploadDataStream::Delegate {
// CronetUploadDataStream::Delegate implementation. Called on network thread. // CronetUploadDataStream::Delegate implementation. Called on network thread.
void InitializeOnNetworkThread( void InitializeOnNetworkThread(
base::WeakPtr<CronetUploadDataStream> upload_data_stream) override; base::WeakPtr<CronetUploadDataStream> upload_data_stream) override;
void Read(net::IOBuffer* buffer, int buf_len) override; void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len) override;
void Rewind() override; void Rewind() override;
void OnUploadDataStreamDestroyed() override; void OnUploadDataStreamDestroyed() override;
......
...@@ -24,10 +24,11 @@ IOBufferWithByteBuffer::IOBufferWithByteBuffer( ...@@ -24,10 +24,11 @@ IOBufferWithByteBuffer::IOBufferWithByteBuffer(
IOBufferWithByteBuffer::~IOBufferWithByteBuffer() {} IOBufferWithByteBuffer::~IOBufferWithByteBuffer() {}
ByteBufferWithIOBuffer::ByteBufferWithIOBuffer(JNIEnv* env, ByteBufferWithIOBuffer::ByteBufferWithIOBuffer(
net::IOBuffer* io_buffer, JNIEnv* env,
int io_buffer_len) scoped_refptr<net::IOBuffer> io_buffer,
: io_buffer_(io_buffer), io_buffer_len_(io_buffer_len) { int io_buffer_len)
: io_buffer_(std::move(io_buffer)), io_buffer_len_(io_buffer_len) {
// An intermediate ScopedJavaLocalRef is needed here to release the local // An intermediate ScopedJavaLocalRef is needed here to release the local
// reference created by env->NewDirectByteBuffer(). // reference created by env->NewDirectByteBuffer().
base::android::ScopedJavaLocalRef<jobject> java_buffer( base::android::ScopedJavaLocalRef<jobject> java_buffer(
......
...@@ -53,7 +53,7 @@ class IOBufferWithByteBuffer : public net::WrappedIOBuffer { ...@@ -53,7 +53,7 @@ class IOBufferWithByteBuffer : public net::WrappedIOBuffer {
class ByteBufferWithIOBuffer { class ByteBufferWithIOBuffer {
public: public:
ByteBufferWithIOBuffer(JNIEnv* env, ByteBufferWithIOBuffer(JNIEnv* env,
net::IOBuffer* io_buffer, scoped_refptr<net::IOBuffer> io_buffer,
int io_buffer_len); int io_buffer_len);
~ByteBufferWithIOBuffer(); ~ByteBufferWithIOBuffer();
......
...@@ -67,7 +67,8 @@ int CronetUploadDataStream::ReadInternal(net::IOBuffer* buf, int buf_len) { ...@@ -67,7 +67,8 @@ int CronetUploadDataStream::ReadInternal(net::IOBuffer* buf, int buf_len) {
read_in_progress_ = true; read_in_progress_ = true;
waiting_on_read_ = true; waiting_on_read_ = true;
at_front_of_stream_ = false; at_front_of_stream_ = false;
delegate_->Read(buf, buf_len); scoped_refptr<net::IOBuffer> buffer(base::WrapRefCounted(buf));
delegate_->Read(std::move(buffer), buf_len);
return net::ERR_IO_PENDING; return net::ERR_IO_PENDING;
} }
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <memory> #include <memory>
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "net/base/upload_data_stream.h" #include "net/base/upload_data_stream.h"
...@@ -36,7 +37,7 @@ class CronetUploadDataStream : public net::UploadDataStream { ...@@ -36,7 +37,7 @@ class CronetUploadDataStream : public net::UploadDataStream {
// Called for each read request. Delegate must respond by calling // Called for each read request. Delegate must respond by calling
// OnReadSuccess on the network thread asynchronous, or failing the request. // OnReadSuccess on the network thread asynchronous, or failing the request.
// Only called when there's no other pending read or rewind operation. // Only called when there's no other pending read or rewind operation.
virtual void Read(net::IOBuffer* buffer, int buf_len) = 0; virtual void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len) = 0;
// Called to rewind the stream. Not called when already at the start of the // Called to rewind the stream. Not called when already at the start of the
// stream. The delegate must respond by calling OnRewindSuccess // stream. The delegate must respond by calling OnRewindSuccess
......
...@@ -42,13 +42,14 @@ Cronet_BufferPtr IOBufferWithCronet_Buffer::Release() { ...@@ -42,13 +42,14 @@ Cronet_BufferPtr IOBufferWithCronet_Buffer::Release() {
return cronet_buffer_.release(); return cronet_buffer_.release();
} }
Cronet_BufferWithIOBuffer::Cronet_BufferWithIOBuffer(net::IOBuffer* io_buffer, Cronet_BufferWithIOBuffer::Cronet_BufferWithIOBuffer(
size_t io_buffer_len) scoped_refptr<net::IOBuffer> io_buffer,
: io_buffer_(io_buffer), size_t io_buffer_len)
: io_buffer_(std::move(io_buffer)),
io_buffer_len_(io_buffer_len), io_buffer_len_(io_buffer_len),
cronet_buffer_(Cronet_Buffer_Create()) { cronet_buffer_(Cronet_Buffer_Create()) {
static base::NoDestructor<Cronet_BufferCallbackUnowned> static_callback; static base::NoDestructor<Cronet_BufferCallbackUnowned> static_callback;
cronet_buffer_->InitWithDataAndCallback(io_buffer->data(), io_buffer_len, cronet_buffer_->InitWithDataAndCallback(io_buffer_->data(), io_buffer_len_,
static_callback.get()); static_callback.get());
} }
......
...@@ -36,14 +36,18 @@ class IOBufferWithCronet_Buffer : public net::WrappedIOBuffer { ...@@ -36,14 +36,18 @@ class IOBufferWithCronet_Buffer : public net::WrappedIOBuffer {
// net::IOBuffer and the Cronet_Buffer object alive until destroyed. // net::IOBuffer and the Cronet_Buffer object alive until destroyed.
class Cronet_BufferWithIOBuffer { class Cronet_BufferWithIOBuffer {
public: public:
Cronet_BufferWithIOBuffer(net::IOBuffer* io_buffer, size_t io_buffer_len); Cronet_BufferWithIOBuffer(scoped_refptr<net::IOBuffer> io_buffer,
size_t io_buffer_len);
~Cronet_BufferWithIOBuffer(); ~Cronet_BufferWithIOBuffer();
const net::IOBuffer* io_buffer() const { return io_buffer_.get(); } const net::IOBuffer* io_buffer() const { return io_buffer_.get(); }
size_t io_buffer_len() const { return io_buffer_len_; } size_t io_buffer_len() const { return io_buffer_len_; }
// Returns pointer to Cronet buffer owned by |this|. // Returns pointer to Cronet buffer owned by |this|.
Cronet_BufferPtr cronet_buffer() { return cronet_buffer_.get(); } Cronet_BufferPtr cronet_buffer() {
CHECK(io_buffer_->HasAtLeastOneRef());
return cronet_buffer_.get();
}
private: private:
scoped_refptr<net::IOBuffer> io_buffer_; scoped_refptr<net::IOBuffer> io_buffer_;
......
...@@ -40,7 +40,7 @@ class Cronet_UploadDataSinkImpl::NetworkTasks ...@@ -40,7 +40,7 @@ class Cronet_UploadDataSinkImpl::NetworkTasks
// CronetUploadDataStream::Delegate implementation: // CronetUploadDataStream::Delegate implementation:
void InitializeOnNetworkThread( void InitializeOnNetworkThread(
base::WeakPtr<CronetUploadDataStream> upload_data_stream) override; base::WeakPtr<CronetUploadDataStream> upload_data_stream) override;
void Read(net::IOBuffer* buffer, int buf_len) override; void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len) override;
void Rewind() override; void Rewind() override;
void OnUploadDataStreamDestroyed() override; void OnUploadDataStreamDestroyed() override;
...@@ -195,7 +195,8 @@ void Cronet_UploadDataSinkImpl::PostCloseToExecutor() { ...@@ -195,7 +195,8 @@ void Cronet_UploadDataSinkImpl::PostCloseToExecutor() {
Cronet_Executor_Execute(upload_data_provider_executor_, runnable); Cronet_Executor_Execute(upload_data_provider_executor_, runnable);
} }
void Cronet_UploadDataSinkImpl::Read(net::IOBuffer* buffer, int buf_len) { void Cronet_UploadDataSinkImpl::Read(scoped_refptr<net::IOBuffer> buffer,
int buf_len) {
if (url_request_->IsDone()) if (url_request_->IsDone())
return; return;
Cronet_UploadDataProviderPtr upload_data_provider = nullptr; Cronet_UploadDataProviderPtr upload_data_provider = nullptr;
...@@ -207,7 +208,8 @@ void Cronet_UploadDataSinkImpl::Read(net::IOBuffer* buffer, int buf_len) { ...@@ -207,7 +208,8 @@ void Cronet_UploadDataSinkImpl::Read(net::IOBuffer* buffer, int buf_len) {
in_which_user_callback_ = READ; in_which_user_callback_ = READ;
upload_data_provider = upload_data_provider_; upload_data_provider = upload_data_provider_;
} }
buffer_ = std::make_unique<Cronet_BufferWithIOBuffer>(buffer, buf_len); buffer_ =
std::make_unique<Cronet_BufferWithIOBuffer>(std::move(buffer), buf_len);
Cronet_UploadDataProvider_Read(upload_data_provider, this, Cronet_UploadDataProvider_Read(upload_data_provider, this,
buffer_->cronet_buffer()); buffer_->cronet_buffer());
} }
...@@ -261,12 +263,13 @@ void Cronet_UploadDataSinkImpl::NetworkTasks::InitializeOnNetworkThread( ...@@ -261,12 +263,13 @@ void Cronet_UploadDataSinkImpl::NetworkTasks::InitializeOnNetworkThread(
base::ThreadTaskRunnerHandle::Get())); base::ThreadTaskRunnerHandle::Get()));
} }
void Cronet_UploadDataSinkImpl::NetworkTasks::Read(net::IOBuffer* buffer, void Cronet_UploadDataSinkImpl::NetworkTasks::Read(
int buf_len) { scoped_refptr<net::IOBuffer> buffer,
int buf_len) {
DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_); DCHECK_CALLED_ON_VALID_THREAD(network_thread_checker_);
PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Read, PostTaskToExecutor(base::BindOnce(&Cronet_UploadDataSinkImpl::Read,
base::Unretained(upload_data_sink_), base::Unretained(upload_data_sink_),
base::Passed(&buffer), buf_len)); std::move(buffer), buf_len));
} }
void Cronet_UploadDataSinkImpl::NetworkTasks::Rewind() { void Cronet_UploadDataSinkImpl::NetworkTasks::Rewind() {
......
...@@ -50,7 +50,7 @@ class Cronet_UploadDataSinkImpl : public Cronet_UploadDataSink { ...@@ -50,7 +50,7 @@ class Cronet_UploadDataSinkImpl : public Cronet_UploadDataSink {
void InitializeUploadDataStream( void InitializeUploadDataStream(
base::WeakPtr<CronetUploadDataStream> upload_data_stream, base::WeakPtr<CronetUploadDataStream> upload_data_stream,
scoped_refptr<base::SingleThreadTaskRunner> network_task_runner); scoped_refptr<base::SingleThreadTaskRunner> network_task_runner);
void Read(net::IOBuffer* buffer, int buf_len); void Read(scoped_refptr<net::IOBuffer> buffer, int buf_len);
void Rewind(); void Rewind();
void Close(); void Close();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment