Commit 11678595 authored by brettw's avatar brettw Committed by Commit bot

Add mojo <--> net IO adapters.

This refactors the net->mojo adapter currently in the URL loader and moves it to the net_adapters file so it can be shared with the tcp_socket. An adapter going in the opposite direction (currently unused) is also added.

Makes some changes to the existing adapter: naming for clarify (since there is now one going in the opposite direction, things were getting pretty confusing), and some of the setup logic was moved into a static function so it can be shared.

Clarify requirements for calling EndWriteData (the old code called this in error cases as well).

Review URL: https://codereview.chromium.org/649783002

Cr-Commit-Position: refs/heads/master@{#299211}
parent 7bc3878b
...@@ -187,9 +187,10 @@ MOJO_SYSTEM_EXPORT MojoResult ...@@ -187,9 +187,10 @@ MOJO_SYSTEM_EXPORT MojoResult
// that thread can then wait for |data_pipe_producer_handle| to become writable // that thread can then wait for |data_pipe_producer_handle| to become writable
// again. // again.
// //
// Once the caller has finished writing data to |*buffer|, it should call // When |MojoBeginWriteData()| returns MOJO_RESULT_OK, and the caller has
// |MojoEndWriteData()| to specify the amount written and to complete the // finished writing data to |*buffer|, it should call |MojoEndWriteData()| to
// two-phase write. // specify the amount written and to complete the two-phase write.
// |MojoEndWriteData()| need not be called for other return values.
// //
// Note: If the data pipe has the "may discard" option flag (specified on // Note: If the data pipe has the "may discard" option flag (specified on
// creation) and |flags| has |MOJO_WRITE_DATA_FLAG_ALL_OR_NONE| set, this may // creation) and |flags| has |MOJO_WRITE_DATA_FLAG_ALL_OR_NONE| set, this may
......
...@@ -8,6 +8,105 @@ ...@@ -8,6 +8,105 @@
namespace mojo { namespace mojo {
namespace {
const uint32_t kMaxBufSize = 64 * 1024;
} // namespace
NetToMojoPendingBuffer::NetToMojoPendingBuffer(
ScopedDataPipeProducerHandle handle,
void* buffer)
: handle_(handle.Pass()),
buffer_(buffer) {
}
NetToMojoPendingBuffer::~NetToMojoPendingBuffer() {
if (handle_.is_valid())
EndWriteDataRaw(handle_.get(), 0);
}
// static
MojoResult NetToMojoPendingBuffer::BeginWrite(
ScopedDataPipeProducerHandle* handle,
scoped_refptr<NetToMojoPendingBuffer>* pending,
uint32_t* num_bytes) {
void* buf;
*num_bytes = 0;
MojoResult result = BeginWriteDataRaw(handle->get(), &buf, num_bytes,
MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK) {
if (*num_bytes > kMaxBufSize)
*num_bytes = kMaxBufSize;
*pending = new NetToMojoPendingBuffer(handle->Pass(), buf);
}
return result;
}
ScopedDataPipeProducerHandle NetToMojoPendingBuffer::Complete(
uint32_t num_bytes) {
EndWriteDataRaw(handle_.get(), num_bytes);
buffer_ = NULL;
return handle_.Pass();
}
// -----------------------------------------------------------------------------
NetToMojoIOBuffer::NetToMojoIOBuffer(
NetToMojoPendingBuffer* pending_buffer)
: net::WrappedIOBuffer(pending_buffer->buffer()),
pending_buffer_(pending_buffer) {
}
NetToMojoIOBuffer::~NetToMojoIOBuffer() {
}
// -----------------------------------------------------------------------------
MojoToNetPendingBuffer::MojoToNetPendingBuffer(
ScopedDataPipeConsumerHandle handle,
const void* buffer)
: handle_(handle.Pass()),
buffer_(buffer) {
}
MojoToNetPendingBuffer::~MojoToNetPendingBuffer() {
}
// static
MojoResult MojoToNetPendingBuffer::BeginRead(
ScopedDataPipeConsumerHandle* handle,
scoped_refptr<MojoToNetPendingBuffer>* pending,
uint32_t* num_bytes) {
const void* buffer = NULL;
*num_bytes = 0;
MojoResult result = BeginReadDataRaw(handle->get(), &buffer, num_bytes,
MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK)
*pending = new MojoToNetPendingBuffer(handle->Pass(), buffer);
return result;
}
ScopedDataPipeConsumerHandle MojoToNetPendingBuffer::Complete(
uint32_t num_bytes) {
EndReadDataRaw(handle_.get(), num_bytes);
buffer_ = NULL;
return handle_.Pass();
}
// -----------------------------------------------------------------------------
MojoToNetIOBuffer::MojoToNetIOBuffer(MojoToNetPendingBuffer* pending_buffer)
: net::WrappedIOBuffer(pending_buffer->buffer()),
pending_buffer_(pending_buffer) {
}
MojoToNetIOBuffer::~MojoToNetIOBuffer() {
}
// -----------------------------------------------------------------------------
NetworkErrorPtr MakeNetworkError(int error_code) { NetworkErrorPtr MakeNetworkError(int error_code) {
NetworkErrorPtr error = NetworkError::New(); NetworkErrorPtr error = NetworkError::New();
error->code = error_code; error->code = error_code;
......
...@@ -5,10 +5,125 @@ ...@@ -5,10 +5,125 @@
#ifndef MOJO_SERVICES_NETWORK_NET_ADAPTERS_H_ #ifndef MOJO_SERVICES_NETWORK_NET_ADAPTERS_H_
#define MOJO_SERVICES_NETWORK_NET_ADAPTERS_H_ #define MOJO_SERVICES_NETWORK_NET_ADAPTERS_H_
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "mojo/services/public/interfaces/network/network_error.mojom.h" #include "mojo/services/public/interfaces/network/network_error.mojom.h"
#include "net/base/io_buffer.h"
namespace mojo { namespace mojo {
// These adapters are used to transfer data between a Mojo pipe and the net
// library. There are four adapters, one for each end in each direction:
//
// Mojo pipe Data flow Network library
// ----------------------------------------------------------
// MojoToNetPendingBuffer ---> MojoToNetIOBuffer
// NetToMojoPendingBuffer <--- NetToMojoIOBuffer
//
// While the operation is in progress, the Mojo-side objects keep ownership
// of the Mojo pipe, which in turn is kept alive by the IOBuffer. This allows
// the request to potentially outlive the object managing the translation.
// Mojo side of a Net -> Mojo copy. The buffer is allocated by Mojo.
class NetToMojoPendingBuffer
: public base::RefCountedThreadSafe<NetToMojoPendingBuffer> {
public:
// Begins a two-phase write to the data pipe.
//
// On success, MOJO_RESULT_OK will be returned. The ownership of the given
// producer handle will be transferred to the new NetToMojoPendingBuffer that
// will be placed into *pending, and the size of the buffer will be in
// *num_bytes.
//
// On failure or MOJO_RESULT_SHOULD_WAIT, there will be no change to the
// handle, and *pending and *num_bytes will be unused.
static MojoResult BeginWrite(ScopedDataPipeProducerHandle* handle,
scoped_refptr<NetToMojoPendingBuffer>* pending,
uint32_t* num_bytes);
// Called to indicate the buffer is done being written to. Passes ownership
// of the pipe back to the caller.
ScopedDataPipeProducerHandle Complete(uint32_t num_bytes);
char* buffer() { return static_cast<char*>(buffer_); }
private:
friend class base::RefCountedThreadSafe<NetToMojoPendingBuffer>;
// Takes ownership of the handle.
NetToMojoPendingBuffer(ScopedDataPipeProducerHandle handle, void* buffer);
~NetToMojoPendingBuffer();
ScopedDataPipeProducerHandle handle_;
void* buffer_;
DISALLOW_COPY_AND_ASSIGN(NetToMojoPendingBuffer);
};
// Net side of a Net -> Mojo copy. The data will be read from the network and
// copied into the buffer associated with the pending mojo write.
class NetToMojoIOBuffer : public net::WrappedIOBuffer {
public:
explicit NetToMojoIOBuffer(NetToMojoPendingBuffer* pending_buffer);
private:
virtual ~NetToMojoIOBuffer();
scoped_refptr<NetToMojoPendingBuffer> pending_buffer_;
};
// Mojo side of a Mojo -> Net copy.
class MojoToNetPendingBuffer
: public base::RefCountedThreadSafe<MojoToNetPendingBuffer> {
public:
// Starts reading from Mojo.
//
// On success, MOJO_RESULT_OK will be returned. The ownership of the given
// consumer handle will be transferred to the new MojoToNetPendingBuffer that
// will be placed into *pending, and the size of the buffer will be in
// *num_bytes.
//
// On failure or MOJO_RESULT_SHOULD_WAIT, there will be no change to the
// handle, and *pending and *num_bytes will be unused.
static MojoResult BeginRead(ScopedDataPipeConsumerHandle* handle,
scoped_refptr<MojoToNetPendingBuffer>* pending,
uint32_t* num_bytes);
// Indicates the buffer is done being read from. Passes ownership of the pipe
// back to the caller. The argument is the number of bytes actually read,
// since net may do partial writes, which will result in partial reads from
// the Mojo pipe's perspective.
ScopedDataPipeConsumerHandle Complete(uint32_t num_bytes);
const char* buffer() { return static_cast<const char*>(buffer_); }
private:
friend class base::RefCountedThreadSafe<MojoToNetPendingBuffer>;
// Takes ownership of the handle.
explicit MojoToNetPendingBuffer(ScopedDataPipeConsumerHandle handle,
const void* buffer);
~MojoToNetPendingBuffer();
ScopedDataPipeConsumerHandle handle_;
const void* buffer_;
DISALLOW_COPY_AND_ASSIGN(MojoToNetPendingBuffer);
};
// Net side of a Mojo -> Net copy. The data will already be in the
// MojoToNetPendingBuffer's buffer.
class MojoToNetIOBuffer : public net::WrappedIOBuffer {
public:
explicit MojoToNetIOBuffer(MojoToNetPendingBuffer* pending_buffer);
private:
virtual ~MojoToNetIOBuffer();
scoped_refptr<MojoToNetPendingBuffer> pending_buffer_;
};
// Creates a new Mojo network error object from a net error code.
NetworkErrorPtr MakeNetworkError(int error_code); NetworkErrorPtr MakeNetworkError(int error_code);
} // namespace mojo } // namespace mojo
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
namespace mojo { namespace mojo {
namespace { namespace {
const uint32_t kMaxReadSize = 64 * 1024;
// Generates an URLResponsePtr from the response state of a net::URLRequest. // Generates an URLResponsePtr from the response state of a net::URLRequest.
URLResponsePtr MakeURLResponse(const net::URLRequest* url_request) { URLResponsePtr MakeURLResponse(const net::URLRequest* url_request) {
URLResponsePtr response(URLResponse::New()); URLResponsePtr response(URLResponse::New());
...@@ -99,59 +97,6 @@ class UploadDataPipeElementReader : public net::UploadElementReader { ...@@ -99,59 +97,6 @@ class UploadDataPipeElementReader : public net::UploadElementReader {
} // namespace } // namespace
// Keeps track of a pending two-phase write on a DataPipeProducerHandle.
class URLLoaderImpl::PendingWriteToDataPipe :
public base::RefCountedThreadSafe<PendingWriteToDataPipe> {
public:
explicit PendingWriteToDataPipe(ScopedDataPipeProducerHandle handle)
: handle_(handle.Pass()),
buffer_(NULL) {
}
MojoResult BeginWrite(uint32_t* num_bytes) {
MojoResult result = BeginWriteDataRaw(handle_.get(), &buffer_, num_bytes,
MOJO_WRITE_DATA_FLAG_NONE);
if (*num_bytes > kMaxReadSize)
*num_bytes = kMaxReadSize;
return result;
}
ScopedDataPipeProducerHandle Complete(uint32_t num_bytes) {
EndWriteDataRaw(handle_.get(), num_bytes);
buffer_ = NULL;
return handle_.Pass();
}
char* buffer() { return static_cast<char*>(buffer_); }
private:
friend class base::RefCountedThreadSafe<PendingWriteToDataPipe>;
~PendingWriteToDataPipe() {
if (handle_.is_valid())
EndWriteDataRaw(handle_.get(), 0);
}
ScopedDataPipeProducerHandle handle_;
void* buffer_;
DISALLOW_COPY_AND_ASSIGN(PendingWriteToDataPipe);
};
// Takes ownership of a pending two-phase write on a DataPipeProducerHandle,
// and makes its buffer available as a net::IOBuffer.
class URLLoaderImpl::DependentIOBuffer : public net::WrappedIOBuffer {
public:
DependentIOBuffer(PendingWriteToDataPipe* pending_write)
: net::WrappedIOBuffer(pending_write->buffer()),
pending_write_(pending_write) {
}
private:
virtual ~DependentIOBuffer() {}
scoped_refptr<PendingWriteToDataPipe> pending_write_;
};
URLLoaderImpl::URLLoaderImpl(NetworkContext* context) URLLoaderImpl::URLLoaderImpl(NetworkContext* context)
: context_(context), : context_(context),
response_body_buffer_size_(0), response_body_buffer_size_(0),
...@@ -312,44 +257,32 @@ void URLLoaderImpl::OnResponseBodyStreamReady(MojoResult result) { ...@@ -312,44 +257,32 @@ void URLLoaderImpl::OnResponseBodyStreamReady(MojoResult result) {
ReadMore(); ReadMore();
} }
void URLLoaderImpl::WaitToReadMore() {
handle_watcher_.Start(response_body_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_DEADLINE_INDEFINITE,
base::Bind(&URLLoaderImpl::OnResponseBodyStreamReady,
weak_ptr_factory_.GetWeakPtr()));
}
void URLLoaderImpl::ReadMore() { void URLLoaderImpl::ReadMore() {
DCHECK(!pending_write_.get()); DCHECK(!pending_write_.get());
pending_write_ = new PendingWriteToDataPipe(response_body_stream_.Pass());
uint32_t num_bytes; uint32_t num_bytes;
MojoResult result = pending_write_->BeginWrite(&num_bytes); MojoResult result = NetToMojoPendingBuffer::BeginWrite(
&response_body_stream_, &pending_write_, &num_bytes);
if (result == MOJO_RESULT_SHOULD_WAIT) { if (result == MOJO_RESULT_SHOULD_WAIT) {
// The pipe is full. We need to wait for it to have more space. // The pipe is full. We need to wait for it to have more space.
response_body_stream_ = pending_write_->Complete(num_bytes); handle_watcher_.Start(response_body_stream_.get(),
pending_write_ = NULL; MOJO_HANDLE_SIGNAL_WRITABLE,
WaitToReadMore(); MOJO_DEADLINE_INDEFINITE,
base::Bind(&URLLoaderImpl::OnResponseBodyStreamReady,
weak_ptr_factory_.GetWeakPtr()));
return; return;
} } else if (result != MOJO_RESULT_OK) {
if (result != MOJO_RESULT_OK) {
// The response body stream is in a bad state. Bail. // The response body stream is in a bad state. Bail.
// TODO(darin): How should this be communicated to our client? // TODO(darin): How should this be communicated to our client?
return; return;
} }
CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes); CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
scoped_refptr<net::IOBuffer> buf = scoped_refptr<net::IOBuffer> buf(new NetToMojoIOBuffer(pending_write_.get()));
new DependentIOBuffer(pending_write_.get());
int bytes_read; int bytes_read;
url_request_->Read(buf.get(), static_cast<int>(num_bytes), &bytes_read); url_request_->Read(buf.get(), static_cast<int>(num_bytes), &bytes_read);
// Drop our reference to the buffer.
buf = NULL;
if (url_request_->status().is_io_pending()) { if (url_request_->status().is_io_pending()) {
// Wait for OnReadCompleted. // Wait for OnReadCompleted.
} else if (url_request_->status().is_success() && bytes_read > 0) { } else if (url_request_->status().is_success() && bytes_read > 0) {
......
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
#include "net/url_request/url_request.h" #include "net/url_request/url_request.h"
namespace mojo { namespace mojo {
class NetworkContext; class NetworkContext;
class NetToMojoPendingBuffer;
class URLLoaderImpl : public InterfaceImpl<URLLoader>, class URLLoaderImpl : public InterfaceImpl<URLLoader>,
public net::URLRequest::Delegate { public net::URLRequest::Delegate {
...@@ -23,9 +25,6 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>, ...@@ -23,9 +25,6 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>,
virtual ~URLLoaderImpl(); virtual ~URLLoaderImpl();
private: private:
class PendingWriteToDataPipe;
class DependentIOBuffer;
// URLLoader methods: // URLLoader methods:
virtual void Start( virtual void Start(
URLRequestPtr request, URLRequestPtr request,
...@@ -48,7 +47,6 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>, ...@@ -48,7 +47,6 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>,
const Callback<void(URLResponsePtr)>& callback); const Callback<void(URLResponsePtr)>& callback);
void SendResponse(URLResponsePtr response); void SendResponse(URLResponsePtr response);
void OnResponseBodyStreamReady(MojoResult result); void OnResponseBodyStreamReady(MojoResult result);
void WaitToReadMore();
void ReadMore(); void ReadMore();
void DidRead(uint32_t num_bytes, bool completed_synchronously); void DidRead(uint32_t num_bytes, bool completed_synchronously);
...@@ -56,7 +54,7 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>, ...@@ -56,7 +54,7 @@ class URLLoaderImpl : public InterfaceImpl<URLLoader>,
scoped_ptr<net::URLRequest> url_request_; scoped_ptr<net::URLRequest> url_request_;
Callback<void(URLResponsePtr)> callback_; Callback<void(URLResponsePtr)> callback_;
ScopedDataPipeProducerHandle response_body_stream_; ScopedDataPipeProducerHandle response_body_stream_;
scoped_refptr<PendingWriteToDataPipe> pending_write_; scoped_refptr<NetToMojoPendingBuffer> pending_write_;
common::HandleWatcher handle_watcher_; common::HandleWatcher handle_watcher_;
uint32 response_body_buffer_size_; uint32 response_body_buffer_size_;
bool auto_follow_redirects_; bool auto_follow_redirects_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment