Commit 485f9a6b authored by sergeyu@chromium.org's avatar sergeyu@chromium.org

Fix P2PSocketHostTcp to handle async write correctly.

Previously P2PSocketHostTcp was calling Write() even when another write is
pending.

BUG=232046

Review URL: https://codereview.chromium.org/13926013

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194628 0039d316-1c4b-4281-b951-d872f2087c98
parent dd9cc622
...@@ -21,6 +21,7 @@ namespace content { ...@@ -21,6 +21,7 @@ namespace content {
P2PSocketHostTcp::P2PSocketHostTcp(IPC::Sender* message_sender, int id) P2PSocketHostTcp::P2PSocketHostTcp(IPC::Sender* message_sender, int id)
: P2PSocketHost(message_sender, id), : P2PSocketHost(message_sender, id),
write_pending_(false),
connected_(false) { connected_(false) {
} }
...@@ -226,7 +227,7 @@ void P2PSocketHostTcp::Send(const net::IPEndPoint& to, ...@@ -226,7 +227,7 @@ void P2PSocketHostTcp::Send(const net::IPEndPoint& to,
} }
void P2PSocketHostTcp::DoWrite() { void P2PSocketHostTcp::DoWrite() {
while (write_buffer_ && state_ == STATE_OPEN) { while (write_buffer_ && state_ == STATE_OPEN && !write_pending_) {
int result = socket_->Write(write_buffer_, write_buffer_->BytesRemaining(), int result = socket_->Write(write_buffer_, write_buffer_->BytesRemaining(),
base::Bind(&P2PSocketHostTcp::OnWritten, base::Bind(&P2PSocketHostTcp::OnWritten,
base::Unretained(this))); base::Unretained(this)));
...@@ -235,7 +236,10 @@ void P2PSocketHostTcp::DoWrite() { ...@@ -235,7 +236,10 @@ void P2PSocketHostTcp::DoWrite() {
} }
void P2PSocketHostTcp::OnWritten(int result) { void P2PSocketHostTcp::OnWritten(int result) {
DCHECK(write_pending_);
DCHECK_NE(result, net::ERR_IO_PENDING); DCHECK_NE(result, net::ERR_IO_PENDING);
write_pending_ = false;
HandleWriteResult(result); HandleWriteResult(result);
DoWrite(); DoWrite();
} }
...@@ -253,7 +257,9 @@ void P2PSocketHostTcp::HandleWriteResult(int result) { ...@@ -253,7 +257,9 @@ void P2PSocketHostTcp::HandleWriteResult(int result) {
write_queue_.pop(); write_queue_.pop();
} }
} }
} else if (result != net::ERR_IO_PENDING) { } else if (result == net::ERR_IO_PENDING) {
write_pending_ = true;
} else {
LOG(ERROR) << "Error when sending data in TCP socket: " << result; LOG(ERROR) << "Error when sending data in TCP socket: " << result;
OnError(); OnError();
} }
......
...@@ -65,6 +65,7 @@ class CONTENT_EXPORT P2PSocketHostTcp : public P2PSocketHost { ...@@ -65,6 +65,7 @@ class CONTENT_EXPORT P2PSocketHostTcp : public P2PSocketHost {
std::queue<scoped_refptr<net::DrainableIOBuffer> > write_queue_; std::queue<scoped_refptr<net::DrainableIOBuffer> > write_queue_;
scoped_refptr<net::DrainableIOBuffer> write_buffer_; scoped_refptr<net::DrainableIOBuffer> write_buffer_;
bool write_pending_;
bool connected_; bool connected_;
......
...@@ -181,4 +181,34 @@ TEST_F(P2PSocketHostTcpTest, SendAfterStunRequest) { ...@@ -181,4 +181,34 @@ TEST_F(P2PSocketHostTcpTest, SendAfterStunRequest) {
EXPECT_EQ(expected_data, sent_data_); EXPECT_EQ(expected_data, sent_data_);
} }
// Verify that asynchronous writes are handled correctly.
TEST_F(P2PSocketHostTcpTest, AsyncWrites) {
MessageLoop message_loop;
socket_->set_async_write(true);
EXPECT_CALL(sender_, Send(
MatchMessage(static_cast<uint32>(P2PMsg_OnSendComplete::ID))))
.Times(2)
.WillRepeatedly(DoAll(DeleteArg<0>(), Return(true)));
std::vector<char> packet1;
CreateStunRequest(&packet1);
socket_host_->Send(dest_, packet1);
std::vector<char> packet2;
CreateStunResponse(&packet2);
socket_host_->Send(dest_, packet2);
message_loop.RunUntilIdle();
std::string expected_data;
expected_data.append(IntToSize(packet1.size()));
expected_data.append(packet1.begin(), packet1.end());
expected_data.append(IntToSize(packet2.size()));
expected_data.append(packet2.begin(), packet2.end());
EXPECT_EQ(expected_data, sent_data_);
}
} // namespace content } // namespace content
...@@ -7,10 +7,13 @@ ...@@ -7,10 +7,13 @@
#include <vector> #include <vector>
#include "base/location.h"
#include "base/single_thread_task_runner.h"
#include "base/sys_byteorder.h" #include "base/sys_byteorder.h"
#include "base/thread_task_runner_handle.h"
#include "content/common/p2p_messages.h" #include "content/common/p2p_messages.h"
#include "ipc/ipc_sender.h"
#include "ipc/ipc_message_utils.h" #include "ipc/ipc_message_utils.h"
#include "ipc/ipc_sender.h"
#include "net/base/address_list.h" #include "net/base/address_list.h"
#include "net/base/completion_callback.h" #include "net/base/completion_callback.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
...@@ -49,6 +52,7 @@ class FakeSocket : public net::StreamSocket { ...@@ -49,6 +52,7 @@ class FakeSocket : public net::StreamSocket {
FakeSocket(std::string* written_data); FakeSocket(std::string* written_data);
virtual ~FakeSocket(); virtual ~FakeSocket();
void set_async_write(bool async_write) { async_write_ = async_write; }
void AppendInputData(const char* data, int data_size); void AppendInputData(const char* data, int data_size);
int input_pos() const { return input_pos_; } int input_pos() const { return input_pos_; }
bool read_pending() const { return read_pending_; } bool read_pending() const { return read_pending_; }
...@@ -78,15 +82,21 @@ class FakeSocket : public net::StreamSocket { ...@@ -78,15 +82,21 @@ class FakeSocket : public net::StreamSocket {
virtual bool GetSSLInfo(net::SSLInfo* ssl_info) OVERRIDE; virtual bool GetSSLInfo(net::SSLInfo* ssl_info) OVERRIDE;
private: private:
void DoAsyncWrite(scoped_refptr<net::IOBuffer> buf, int buf_len,
const net::CompletionCallback& callback);
bool read_pending_; bool read_pending_;
scoped_refptr<net::IOBuffer> read_buffer_; scoped_refptr<net::IOBuffer> read_buffer_;
int read_buffer_size_; int read_buffer_size_;
net::CompletionCallback read_callback_; net::CompletionCallback read_callback_;
std::string* written_data_;
std::string input_data_; std::string input_data_;
int input_pos_; int input_pos_;
std::string* written_data_;
bool async_write_;
bool write_pending_;
net::IPEndPoint peer_address_; net::IPEndPoint peer_address_;
net::IPEndPoint local_address_; net::IPEndPoint local_address_;
...@@ -95,8 +105,10 @@ class FakeSocket : public net::StreamSocket { ...@@ -95,8 +105,10 @@ class FakeSocket : public net::StreamSocket {
FakeSocket::FakeSocket(std::string* written_data) FakeSocket::FakeSocket(std::string* written_data)
: read_pending_(false), : read_pending_(false),
input_pos_(0),
written_data_(written_data), written_data_(written_data),
input_pos_(0) { async_write_(false),
write_pending_(false) {
} }
FakeSocket::~FakeSocket() { } FakeSocket::~FakeSocket() { }
...@@ -147,6 +159,17 @@ int FakeSocket::Read(net::IOBuffer* buf, int buf_len, ...@@ -147,6 +159,17 @@ int FakeSocket::Read(net::IOBuffer* buf, int buf_len,
int FakeSocket::Write(net::IOBuffer* buf, int buf_len, int FakeSocket::Write(net::IOBuffer* buf, int buf_len,
const net::CompletionCallback& callback) { const net::CompletionCallback& callback) {
DCHECK(buf); DCHECK(buf);
DCHECK(!write_pending_);
if (async_write_) {
base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, base::Bind(
&FakeSocket::DoAsyncWrite, base::Unretained(this),
scoped_refptr<net::IOBuffer>(buf), buf_len, callback));
write_pending_ = true;
return net::ERR_IO_PENDING;
}
if (written_data_) { if (written_data_) {
written_data_->insert(written_data_->end(), written_data_->insert(written_data_->end(),
buf->data(), buf->data() + buf_len); buf->data(), buf->data() + buf_len);
...@@ -154,6 +177,16 @@ int FakeSocket::Write(net::IOBuffer* buf, int buf_len, ...@@ -154,6 +177,16 @@ int FakeSocket::Write(net::IOBuffer* buf, int buf_len,
return buf_len; return buf_len;
} }
void FakeSocket::DoAsyncWrite(scoped_refptr<net::IOBuffer> buf, int buf_len,
const net::CompletionCallback& callback) {
write_pending_ = false;
if (written_data_) {
written_data_->insert(written_data_->end(),
buf->data(), buf->data() + buf_len);
}
callback.Run(buf_len);
}
bool FakeSocket::SetReceiveBufferSize(int32 size) { bool FakeSocket::SetReceiveBufferSize(int32 size) {
NOTIMPLEMENTED(); NOTIMPLEMENTED();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment