Commit 5a5ee680 authored by hclam's avatar hclam Committed by Commit bot

UDP: Windows implementation using non-blocking IO

Non-blocking IO has higher throughput than the previous implementation
that used overlapped IO. 15% increase in throughput for sending to
localhost is observed. There could be additional performance advantage
in WiFi network.

All existing UDP socket tests passed. I added a unit test to do
benchmarking. Also tested this manually with WebRTC and verified that
all UDP send and receive activities are logged correctly.

BUG=442392

Review URL: https://codereview.chromium.org/861963002

Cr-Commit-Position: refs/heads/master@{#314752}
parent a81bbedf
......@@ -837,6 +837,7 @@
'cookies/cookie_monster_perftest.cc',
'disk_cache/blockfile/disk_cache_perftest.cc',
'proxy/proxy_resolver_perftest.cc',
'udp/udp_socket_perftest.cc',
'websockets/websocket_frame_perftest.cc',
],
'conditions': [
......
......@@ -62,4 +62,10 @@ const BoundNetLog& UDPClientSocket::NetLog() const {
return socket_.NetLog();
}
#if defined(OS_WIN)
void UDPClientSocket::UseNonBlockingIO() {
socket_.UseNonBlockingIO();
}
#endif
} // namespace net
......@@ -38,6 +38,12 @@ class NET_EXPORT_PRIVATE UDPClientSocket : public DatagramClientSocket {
int SetSendBufferSize(int32 size) override;
const BoundNetLog& NetLog() const override;
#if defined(OS_WIN)
// Switch to use non-blocking IO. Must be called right after construction and
// before other calls.
void UseNonBlockingIO();
#endif
private:
UDPSocket socket_;
DISALLOW_COPY_AND_ASSIGN(UDPClientSocket);
......
......@@ -120,4 +120,10 @@ void UDPServerSocket::DetachFromThread() {
socket_.DetachFromThread();
}
#if defined(OS_WIN)
void UDPServerSocket::UseNonBlockingIO() {
socket_.UseNonBlockingIO();
}
#endif
} // namespace net
......@@ -47,6 +47,12 @@ class NET_EXPORT UDPServerSocket : public DatagramServerSocket {
int SetDiffServCodePoint(DiffServCodePoint dscp) override;
void DetachFromThread() override;
#if defined(OS_WIN)
// Switch to use non-blocking IO. Must be called right after construction and
// before other calls.
void UseNonBlockingIO();
#endif
private:
UDPSocket socket_;
bool allow_address_reuse_;
......
// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "base/test/perf_time_logger.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_endpoint.h"
#include "net/base/net_errors.h"
#include "net/base/net_log_unittest.h"
#include "net/base/net_util.h"
#include "net/base/test_completion_callback.h"
#include "net/test/net_test_suite.h"
#include "net/udp/udp_client_socket.h"
#include "net/udp/udp_server_socket.h"
#include "net/udp/udp_socket.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
namespace {
class UDPSocketPerfTest : public PlatformTest {
public:
UDPSocketPerfTest()
: buffer_(new net::IOBufferWithSize(kPacketSize)), weak_factory_(this) {}
void DoneWritePacketsToSocket(net::UDPClientSocket* socket,
int num_of_packets,
base::Closure done_callback,
int error) {
WritePacketsToSocket(socket, num_of_packets, done_callback);
}
// Send |num_of_packets| to |socket|. Invoke |done_callback| when done.
void WritePacketsToSocket(net::UDPClientSocket* socket,
int num_of_packets,
base::Closure done_callback);
// Use non-blocking IO if |use_nonblocking_io| is true. This variable only
// has effect on Windows.
void WriteBenchmark(bool use_nonblocking_io);
protected:
static const int kPacketSize = 1024;
scoped_refptr<net::IOBufferWithSize> buffer_;
base::WeakPtrFactory<UDPSocketPerfTest> weak_factory_;
};
// Creates and address from an ip/port and returns it in |address|.
void CreateUDPAddress(std::string ip_str,
uint16 port,
net::IPEndPoint* address) {
net::IPAddressNumber ip_number;
bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number);
if (!rv)
return;
*address = net::IPEndPoint(ip_number, port);
}
void UDPSocketPerfTest::WritePacketsToSocket(net::UDPClientSocket* socket,
int num_of_packets,
base::Closure done_callback) {
scoped_refptr<net::IOBufferWithSize> io_buffer(
new net::IOBufferWithSize(kPacketSize));
memset(io_buffer->data(), 'G', kPacketSize);
while (num_of_packets) {
int rv =
socket->Write(io_buffer.get(), io_buffer->size(),
base::Bind(&UDPSocketPerfTest::DoneWritePacketsToSocket,
weak_factory_.GetWeakPtr(), socket,
num_of_packets - 1, done_callback));
if (rv == net::ERR_IO_PENDING)
break;
--num_of_packets;
}
if (!num_of_packets) {
done_callback.Run();
return;
}
}
void UDPSocketPerfTest::WriteBenchmark(bool use_nonblocking_io) {
base::MessageLoopForIO message_loop;
const uint16 kPort = 9999;
// Setup the server to listen.
net::IPEndPoint bind_address;
CreateUDPAddress("127.0.0.1", kPort, &bind_address);
net::CapturingNetLog server_log;
scoped_ptr<net::UDPServerSocket> server(
new net::UDPServerSocket(&server_log, net::NetLog::Source()));
#if defined(OS_WIN)
if (use_nonblocking_io)
server->UseNonBlockingIO();
#endif
int rv = server->Listen(bind_address);
ASSERT_EQ(net::OK, rv);
// Setup the client.
net::IPEndPoint server_address;
CreateUDPAddress("127.0.0.1", kPort, &server_address);
net::CapturingNetLog client_log;
scoped_ptr<net::UDPClientSocket> client(new net::UDPClientSocket(
net::DatagramSocket::DEFAULT_BIND, net::RandIntCallback(), &client_log,
net::NetLog::Source()));
#if defined(OS_WIN)
if (use_nonblocking_io)
client->UseNonBlockingIO();
#endif
rv = client->Connect(server_address);
EXPECT_EQ(net::OK, rv);
base::RunLoop run_loop;
base::TimeTicks start_ticks = base::TimeTicks::Now();
int packets = 100000;
client->SetSendBufferSize(1024);
WritePacketsToSocket(client.get(), packets, run_loop.QuitClosure());
run_loop.Run();
double elapsed = (base::TimeTicks::Now() - start_ticks).InSecondsF();
LOG(INFO) << "Write speed: " << packets / 1024 / elapsed << " MB/s";
}
TEST_F(UDPSocketPerfTest, Write) {
base::PerfTimeLogger timer("UDP_socket_write");
WriteBenchmark(false);
}
#if defined(OS_WIN)
TEST_F(UDPSocketPerfTest, WriteNonBlocking) {
base::PerfTimeLogger timer("UDP_socket_write_nonblocking");
WriteBenchmark(true);
}
#endif
} // namespace
......@@ -9,7 +9,10 @@
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram.h"
#include "base/run_loop.h"
#include "base/stl_util.h"
#include "net/base/io_buffer.h"
#include "net/base/ip_endpoint.h"
......@@ -27,9 +30,7 @@ namespace {
class UDPSocketTest : public PlatformTest {
public:
UDPSocketTest()
: buffer_(new IOBufferWithSize(kMaxRead)) {
}
UDPSocketTest() : buffer_(new IOBufferWithSize(kMaxRead)) {}
// Blocks until data is read from the socket.
std::string RecvFromSocket(UDPServerSocket* socket) {
......@@ -112,22 +113,36 @@ class UDPSocketTest : public PlatformTest {
return bytes_sent;
}
protected:
static const int kMaxRead = 1024;
scoped_refptr<IOBufferWithSize> buffer_;
IPEndPoint recv_from_address_;
};
void WriteSocketIgnoreResult(UDPClientSocket* socket, std::string msg) {
WriteSocket(socket, msg);
}
// Creates and address from an ip/port and returns it in |address|.
void CreateUDPAddress(std::string ip_str, uint16 port, IPEndPoint* address) {
// Creates an address from ip address and port and writes it to |*address|.
void CreateUDPAddress(std::string ip_str, uint16 port, IPEndPoint* address) {
IPAddressNumber ip_number;
bool rv = ParseIPLiteralToNumber(ip_str, &ip_number);
if (!rv)
return;
*address = IPEndPoint(ip_number, port);
}
// Run unit test for a connection test.
// |use_nonblocking_io| is used to switch between overlapped and non-blocking
// IO on Windows. It has no effect in other ports.
void ConnectTest(bool use_nonblocking_io);
protected:
static const int kMaxRead = 1024;
scoped_refptr<IOBufferWithSize> buffer_;
IPEndPoint recv_from_address_;
};
void ReadCompleteCallback(int* result_out, base::Closure callback, int result) {
*result_out = result;
callback.Run();
}
TEST_F(UDPSocketTest, Connect) {
void UDPSocketTest::ConnectTest(bool use_nonblocking_io) {
const uint16 kPort = 9999;
std::string simple_message("hello world!");
......@@ -137,6 +152,10 @@ TEST_F(UDPSocketTest, Connect) {
CapturingNetLog server_log;
scoped_ptr<UDPServerSocket> server(
new UDPServerSocket(&server_log, NetLog::Source()));
#if defined(OS_WIN)
if (use_nonblocking_io)
server->UseNonBlockingIO();
#endif
server->AllowAddressReuse();
int rv = server->Listen(bind_address);
ASSERT_EQ(OK, rv);
......@@ -146,10 +165,13 @@ TEST_F(UDPSocketTest, Connect) {
CreateUDPAddress("127.0.0.1", kPort, &server_address);
CapturingNetLog client_log;
scoped_ptr<UDPClientSocket> client(
new UDPClientSocket(DatagramSocket::DEFAULT_BIND,
RandIntCallback(),
&client_log,
NetLog::Source()));
new UDPClientSocket(DatagramSocket::DEFAULT_BIND, RandIntCallback(),
&client_log, NetLog::Source()));
#if defined(OS_WIN)
if (use_nonblocking_io)
client->UseNonBlockingIO();
#endif
rv = client->Connect(server_address);
EXPECT_EQ(OK, rv);
......@@ -169,6 +191,23 @@ TEST_F(UDPSocketTest, Connect) {
str = ReadSocket(client.get());
DCHECK(simple_message == str);
// Test asynchronous read. Server waits for message.
base::RunLoop run_loop;
int read_result = 0;
rv = server->RecvFrom(
buffer_.get(), kMaxRead, &recv_from_address_,
base::Bind(&ReadCompleteCallback, &read_result, run_loop.QuitClosure()));
EXPECT_EQ(ERR_IO_PENDING, rv);
// Client sends to the server.
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&UDPSocketTest::WriteSocketIgnoreResult,
base::Unretained(this), client.get(), simple_message));
run_loop.Run();
EXPECT_EQ(simple_message.length(), static_cast<size_t>(read_result));
EXPECT_EQ(simple_message, std::string(buffer_->data(), read_result));
// Delete sockets so they log their final events.
server.reset();
client.reset();
......@@ -176,34 +215,48 @@ TEST_F(UDPSocketTest, Connect) {
// Check the server's log.
CapturingNetLog::CapturedEntryList server_entries;
server_log.GetEntries(&server_entries);
EXPECT_EQ(4u, server_entries.size());
EXPECT_TRUE(LogContainsBeginEvent(
server_entries, 0, NetLog::TYPE_SOCKET_ALIVE));
EXPECT_EQ(5u, server_entries.size());
EXPECT_TRUE(
LogContainsBeginEvent(server_entries, 0, NetLog::TYPE_SOCKET_ALIVE));
EXPECT_TRUE(LogContainsEvent(
server_entries, 1, NetLog::TYPE_UDP_BYTES_RECEIVED, NetLog::PHASE_NONE));
EXPECT_TRUE(LogContainsEvent(server_entries, 2, NetLog::TYPE_UDP_BYTES_SENT,
NetLog::PHASE_NONE));
EXPECT_TRUE(LogContainsEvent(
server_entries, 2, NetLog::TYPE_UDP_BYTES_SENT, NetLog::PHASE_NONE));
EXPECT_TRUE(LogContainsEndEvent(
server_entries, 3, NetLog::TYPE_SOCKET_ALIVE));
server_entries, 3, NetLog::TYPE_UDP_BYTES_RECEIVED, NetLog::PHASE_NONE));
EXPECT_TRUE(
LogContainsEndEvent(server_entries, 4, NetLog::TYPE_SOCKET_ALIVE));
// Check the client's log.
CapturingNetLog::CapturedEntryList client_entries;
client_log.GetEntries(&client_entries);
EXPECT_EQ(6u, client_entries.size());
EXPECT_TRUE(LogContainsBeginEvent(
client_entries, 0, NetLog::TYPE_SOCKET_ALIVE));
EXPECT_TRUE(LogContainsBeginEvent(
client_entries, 1, NetLog::TYPE_UDP_CONNECT));
EXPECT_TRUE(LogContainsEndEvent(
client_entries, 2, NetLog::TYPE_UDP_CONNECT));
EXPECT_TRUE(LogContainsEvent(
client_entries, 3, NetLog::TYPE_UDP_BYTES_SENT, NetLog::PHASE_NONE));
EXPECT_EQ(7u, client_entries.size());
EXPECT_TRUE(
LogContainsBeginEvent(client_entries, 0, NetLog::TYPE_SOCKET_ALIVE));
EXPECT_TRUE(
LogContainsBeginEvent(client_entries, 1, NetLog::TYPE_UDP_CONNECT));
EXPECT_TRUE(LogContainsEndEvent(client_entries, 2, NetLog::TYPE_UDP_CONNECT));
EXPECT_TRUE(LogContainsEvent(client_entries, 3, NetLog::TYPE_UDP_BYTES_SENT,
NetLog::PHASE_NONE));
EXPECT_TRUE(LogContainsEvent(
client_entries, 4, NetLog::TYPE_UDP_BYTES_RECEIVED, NetLog::PHASE_NONE));
EXPECT_TRUE(LogContainsEndEvent(
client_entries, 5, NetLog::TYPE_SOCKET_ALIVE));
EXPECT_TRUE(LogContainsEvent(client_entries, 5, NetLog::TYPE_UDP_BYTES_SENT,
NetLog::PHASE_NONE));
EXPECT_TRUE(
LogContainsEndEvent(client_entries, 6, NetLog::TYPE_SOCKET_ALIVE));
}
TEST_F(UDPSocketTest, Connect) {
// The variable |use_nonblocking_io| has no effect in non-Windows ports.
ConnectTest(false);
}
#if defined(OS_WIN)
TEST_F(UDPSocketTest, ConnectNonBlocking) {
ConnectTest(true);
}
#endif
#if defined(OS_MACOSX)
// UDPSocketPrivate_Broadcast is disabled for OSX because it requires
// root permissions on OSX 10.7+.
......
This diff is collapsed.
......@@ -12,6 +12,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/threading/non_thread_safe.h"
#include "base/win/object_watcher.h"
#include "base/win/scoped_handle.h"
#include "net/base/address_family.h"
#include "net/base/completion_callback.h"
#include "net/base/net_export.h"
......@@ -23,7 +24,9 @@
namespace net {
class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
class NET_EXPORT UDPSocketWin
: NON_EXPORTED_BASE(public base::NonThreadSafe),
NON_EXPORTED_BASE(public base::win::ObjectWatcher::Delegate) {
public:
UDPSocketWin(DatagramSocket::BindType bind_type,
const RandIntCallback& rand_int_cb,
......@@ -174,6 +177,10 @@ class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
// Resets the thread to be used for thread-safety checks.
void DetachFromThread();
// This class by default uses overlapped IO. Call this method before Open()
// to switch to non-blocking IO.
void UseNonBlockingIO();
private:
enum SocketOptions {
SOCKET_OPTION_MULTICAST_LOOP = 1 << 0
......@@ -183,13 +190,20 @@ class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
void DoReadCallback(int rv);
void DoWriteCallback(int rv);
void DidCompleteRead();
void DidCompleteWrite();
// base::ObjectWatcher::Delegate implementation.
virtual void OnObjectSignaled(HANDLE object);
void OnReadSignaled();
void OnWriteSignaled();
void WatchForReadWrite();
// Handles stats and logging. |result| is the number of bytes transferred, on
// success, or the net error code on failure. LogRead retrieves the address
// from |recv_addr_storage_|, while LogWrite takes it as an optional argument.
void LogRead(int result, const char* bytes) const;
// success, or the net error code on failure.
void LogRead(int result, const char* bytes, const IPEndPoint* address) const;
void LogWrite(int result, const char* bytes, const IPEndPoint* address) const;
// Same as SendTo(), except that address is passed by pointer
......@@ -201,8 +215,22 @@ class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
const CompletionCallback& callback);
int InternalConnect(const IPEndPoint& address);
int InternalRecvFrom(IOBuffer* buf, int buf_len, IPEndPoint* address);
int InternalSendTo(IOBuffer* buf, int buf_len, const IPEndPoint* address);
// Version for using overlapped IO.
int InternalRecvFromOverlapped(IOBuffer* buf,
int buf_len,
IPEndPoint* address);
int InternalSendToOverlapped(IOBuffer* buf,
int buf_len,
const IPEndPoint* address);
// Version for using non-blocking IO.
int InternalRecvFromNonBlocking(IOBuffer* buf,
int buf_len,
IPEndPoint* address);
int InternalSendToNonBlocking(IOBuffer* buf,
int buf_len,
const IPEndPoint* address);
// Applies |socket_options_| to |socket_|. Should be called before
// Bind().
......@@ -211,10 +239,6 @@ class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
// Binds to a random port on |address|.
int RandomBind(const IPAddressNumber& address);
// Attempts to convert the data in |recv_addr_storage_| and |recv_addr_len_|
// to an IPEndPoint and writes it to |address|. Returns true on success.
bool ReceiveAddressToIPEndpoint(IPEndPoint* address) const;
SOCKET socket_;
int addr_family_;
bool is_connected_;
......@@ -247,6 +271,22 @@ class NET_EXPORT UDPSocketWin : NON_EXPORTED_BASE(public base::NonThreadSafe) {
// they are not destroyed while the OS still references them.
scoped_refptr<Core> core_;
// True if non-blocking IO is used.
bool use_non_blocking_io_;
// Watches |read_write_event_|.
base::win::ObjectWatcher read_write_watcher_;
// Events for read and write.
base::win::ScopedHandle read_write_event_;
// The buffers used in Read() and Write().
scoped_refptr<IOBuffer> read_iobuffer_;
scoped_refptr<IOBuffer> write_iobuffer_;
int read_iobuffer_len_;
int write_iobuffer_len_;
IPEndPoint* recv_from_address_;
// Cached copy of the current address we're sending to, if any. Used for
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment