Commit 2b400c19 authored by rockot's avatar rockot Committed by Commit bot

De-Clientize UDPSocket service

BUG=451321

Review URL: https://codereview.chromium.org/880613005

Cr-Commit-Position: refs/heads/master@{#313984}
parent eebe273b
......@@ -66,8 +66,9 @@ void NetworkServiceImpl::CreateTCPConnectedSocket(
callback.Run(MakeNetworkError(net::ERR_NOT_IMPLEMENTED), NetAddressPtr());
}
void NetworkServiceImpl::CreateUDPSocket(InterfaceRequest<UDPSocket> socket) {
BindToRequest(new UDPSocketImpl(), &socket);
void NetworkServiceImpl::CreateUDPSocket(InterfaceRequest<UDPSocket> request) {
// The lifetime of this UDPSocketImpl is bound to that of the underlying pipe.
new UDPSocketImpl(request.Pass());
}
} // namespace mojo
......@@ -40,6 +40,23 @@ void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const {
delegate_->OnSendToCompleted(result.Pass(), forward_callback_);
}
UDPSocketWrapper::ReceiverBindingCallback::ReceiverBindingCallback(
UDPSocketWrapper* delegate,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& wrapper_callback)
: delegate_(delegate), wrapper_callback_(wrapper_callback) {
}
UDPSocketWrapper::ReceiverBindingCallback::~ReceiverBindingCallback() {
}
void UDPSocketWrapper::ReceiverBindingCallback::Run(
NetworkErrorPtr result,
NetAddressPtr addr,
InterfaceRequest<UDPSocketReceiver> request) const {
delegate_->StartReceivingData(request.Pass());
wrapper_callback_.Run(result.Pass(), addr.Pass());
}
UDPSocketWrapper::ReceivedData::ReceivedData() {}
UDPSocketWrapper::ReceivedData::~ReceivedData() {}
......@@ -47,20 +64,22 @@ UDPSocketWrapper::SendRequest::SendRequest() {}
UDPSocketWrapper::SendRequest::~SendRequest() {}
UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket)
: socket_(socket.Pass()),
max_receive_queue_size_(kDefaultReceiveQueueSlots),
max_pending_sends_(1),
current_pending_sends_(0) {
: binding_(this),
socket_(socket.Pass()),
max_receive_queue_size_(kDefaultReceiveQueueSlots),
max_pending_sends_(1),
current_pending_sends_(0) {
Initialize(0);
}
UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket,
uint32_t receive_queue_slots,
uint32_t requested_max_pending_sends)
: socket_(socket.Pass()),
max_receive_queue_size_(receive_queue_slots),
max_pending_sends_(1),
current_pending_sends_(0) {
: binding_(this),
socket_(socket.Pass()),
max_receive_queue_size_(receive_queue_slots),
max_pending_sends_(1),
current_pending_sends_(0) {
Initialize(requested_max_pending_sends);
}
......@@ -82,13 +101,19 @@ void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) {
void UDPSocketWrapper::Bind(
NetAddressPtr addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
socket_->Bind(addr.Pass(), callback);
socket_->Bind(
addr.Pass(),
BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable*>(
new ReceiverBindingCallback(this, callback))));
}
void UDPSocketWrapper::Connect(
NetAddressPtr remote_addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
socket_->Connect(remote_addr.Pass(), callback);
socket_->Connect(
remote_addr.Pass(),
BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable*>(
new ReceiverBindingCallback(this, callback))));
}
void UDPSocketWrapper::SetSendBufferSize(uint32_t size,
......@@ -159,13 +184,11 @@ void UDPSocketWrapper::OnReceived(NetworkErrorPtr result,
}
void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) {
socket_.set_client(this);
socket_->NegotiateMaxPendingSendRequests(
requested_max_pending_sends,
Callback<void(uint32_t)>(
static_cast< Callback<void(uint32_t)>::Runnable*>(
new NegotiateCallbackHandler(this))));
socket_->ReceiveMore(max_receive_queue_size_);
}
void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
......@@ -210,4 +233,10 @@ bool UDPSocketWrapper::ProcessNextSendRequest() {
return true;
}
void UDPSocketWrapper::StartReceivingData(
InterfaceRequest<UDPSocketReceiver> request) {
binding_.Bind(request.Pass());
socket_->ReceiveMore(max_receive_queue_size_);
}
} // namespace mojo
......@@ -8,6 +8,7 @@
#include <queue>
#include "network/public/interfaces/udp_socket.mojom.h"
#include "third_party/mojo/src/mojo/public/cpp/bindings/binding.h"
namespace mojo {
......@@ -18,11 +19,15 @@ namespace mojo {
// - You don't need to worry about the max-pending-send-requests restriction
// imposed by the service side. If you make many SendTo() calls in a short
// period of time, it caches excessive requests and sends them later.
class UDPSocketWrapper : public UDPSocketClient {
class UDPSocketWrapper : public UDPSocketReceiver {
public:
typedef Callback<void(NetworkErrorPtr, NetAddressPtr, Array<uint8_t>)>
ReceiveCallback;
typedef Callback<void(NetworkErrorPtr)> ErrorCallback;
using ReceiveCallback =
Callback<void(NetworkErrorPtr, NetAddressPtr, Array<uint8_t>)>;
using ErrorCallback = Callback<void(NetworkErrorPtr)>;
using BindOrConnectCallback =
Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>;
explicit UDPSocketWrapper(UDPSocketPtr socket);
......@@ -98,6 +103,26 @@ class UDPSocketWrapper : public UDPSocketClient {
ErrorCallback forward_callback_;
};
class ReceiverBindingCallback : public BindOrConnectCallback::Runnable {
public:
ReceiverBindingCallback(
UDPSocketWrapper* delegate,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& wrapper_callback);
~ReceiverBindingCallback() override;
// BindOrConnectCallback::Runnable implementation:
void Run(NetworkErrorPtr result,
NetAddressPtr addr,
InterfaceRequest<UDPSocketReceiver> request) const override;
private:
// Because this callback is passed to a method of |socket_|, and |socket_|
// is owned by |delegate_|, it should be safe to assume that |delegate_| is
// valid if/when Run() is called.
UDPSocketWrapper* delegate_;
const Callback<void(NetworkErrorPtr, NetAddressPtr)> wrapper_callback_;
};
struct ReceivedData {
ReceivedData();
~ReceivedData();
......@@ -116,7 +141,7 @@ class UDPSocketWrapper : public UDPSocketClient {
ErrorCallback callback;
};
// UDPSocketClient implementation:
// UDPSocketReceiver implementation:
void OnReceived(NetworkErrorPtr result,
NetAddressPtr src_addr,
Array<uint8_t> data) override;
......@@ -130,6 +155,12 @@ class UDPSocketWrapper : public UDPSocketClient {
// Returns true if a send request in |send_requests_| has been processed.
bool ProcessNextSendRequest();
// Binds to a UDPSocketReceiver request and notifies |socket_| that we're
// ready to start receiving data.
void StartReceivingData(InterfaceRequest<UDPSocketReceiver> request);
Binding<UDPSocketReceiver> binding_;
UDPSocketPtr socket_;
uint32_t max_receive_queue_size_;
......
......@@ -7,17 +7,15 @@ module mojo;
import "network/public/interfaces/net_address.mojom";
import "network/public/interfaces/network_error.mojom";
// UDPSocket and UDPSocketClient represent a UDP socket and its client. The
// UDPSocket and UDPSocketReceiver represent a UDP socket and its client. The
// typical flow of using the interfaces is:
// - Acquire a UDPSocket interface pointer and set a UDPSocketClient instance.
// - Acquire a UDPSocket interface pointer.
// - (optional) Set options which are allowed prior to Bind()/Connect().
// - Bind or connect the socket.
// - (optional) Bind the UDPSocketReceiver request returned by Bind()/Connect()
// - (optional) Set options which are allowed after Bind()/Connect().
// - Send / request to receive datagrams. Received datagrams will be delivered
// to UDPSocketClient.OnReceived().
// TODO(yzshen): Get rid of [Client] annotation.
[Client=UDPSocketClient]
// to the bound receiver's OnReceived() call.
interface UDPSocket {
// Allows the socket to share the local address to which it will be bound with
// other processes. Should be called before Bind().
......@@ -28,15 +26,20 @@ interface UDPSocket {
// connected.
// |bound_addr| is non-null on success. It might not be the same as |addr|.
// For example, if port 0 is used in |addr|, an available port is picked and
// returned in |bound_addr|.
Bind(NetAddress addr) => (NetworkError result, NetAddress? bound_addr);
// returned in |bound_addr|. The caller may provide an implementation of
// |receiver| to receive datagrams read from the socket. |receiver| is null
// on failure.
Bind(NetAddress addr) => (NetworkError result, NetAddress? bound_addr,
UDPSocketReceiver&? receiver);
// Connects the socket to the remote address. The socket must not be bound or
// connected.
// |local_addr| is non-null on success.
// The caller may provide an implementation of |receiver| to receive datagrams
// read from the socket. |receiver| is null on failure.
Connect(NetAddress remote_addr) => (NetworkError result,
NetAddress? local_addr);
NetAddress? local_addr,
UDPSocketReceiver&? receiver);
// Sets the OS send buffer size (in bytes) for the socket. The socket must be
// bound or connected.
......@@ -60,8 +63,8 @@ interface UDPSocket {
NegotiateMaxPendingSendRequests(uint32 requested_size)
=> (uint32 actual_size);
// Notifies that the client is ready to accept |number| of datagrams.
// Correspondingly, OnReceived() of the UDPSocketClient interface will be
// Notifies that the receiver is ready to accept |number| of datagrams.
// Correspondingly, OnReceived() of the UDPSocketReceiver interface will be
// called |number| times (errors also count), unless the connection is closed
// before that.
//
......@@ -108,7 +111,7 @@ interface UDPSocket {
SendTo(NetAddress? dest_addr, array<uint8> data) => (NetworkError result);
};
interface UDPSocketClient {
interface UDPSocketReceiver {
// On success, |data| is non-null, |src_addr| is non-null if the socket is
// not connected, |result.code| is a non-negative number indicating how many
// bytes have been received. On failure, |result.code| is a network error
......
......@@ -129,6 +129,42 @@ class TestCallback : public TestCallbackBase<Callback<void(NetworkErrorPtr)>> {
NetworkErrorPtr result_;
};
class TestCallbackWithAddressAndReceiver
: public TestCallbackBase<
Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>> {
public:
TestCallbackWithAddressAndReceiver() { Initialize(new State()); }
~TestCallbackWithAddressAndReceiver() {}
const NetworkErrorPtr& result() const { return result_; }
const NetAddressPtr& net_address() const { return net_address_; }
InterfaceRequest<UDPSocketReceiver>& receiver() { return receiver_; }
private:
struct State : public StateBase {
~State() override {}
void Run(NetworkErrorPtr result,
NetAddressPtr net_address,
InterfaceRequest<UDPSocketReceiver> receiver) const override {
if (test_callback_) {
TestCallbackWithAddressAndReceiver* callback =
static_cast<TestCallbackWithAddressAndReceiver*>(test_callback_);
callback->result_ = result.Pass();
callback->net_address_ = net_address.Pass();
callback->receiver_ = receiver.Pass();
}
NotifyRun();
}
};
NetworkErrorPtr result_;
NetAddressPtr net_address_;
InterfaceRequest<UDPSocketReceiver> receiver_;
};
class TestCallbackWithAddress
: public TestCallbackBase<Callback<void(NetworkErrorPtr, NetAddressPtr)>> {
public:
......@@ -228,11 +264,11 @@ struct ReceiveResult {
Array<uint8_t> data;
};
class UDPSocketClientImpl : public UDPSocketClient {
class UDPSocketReceiverImpl : public UDPSocketReceiver {
public:
UDPSocketClientImpl() : run_loop_(nullptr), expected_receive_count_(0) {}
UDPSocketReceiverImpl() : run_loop_(nullptr), expected_receive_count_(0) {}
~UDPSocketClientImpl() override {
~UDPSocketReceiverImpl() override {
while (!results_.empty()) {
delete results_.front();
results_.pop();
......@@ -275,12 +311,12 @@ class UDPSocketClientImpl : public UDPSocketClient {
std::queue<ReceiveResult*> results_;
size_t expected_receive_count_;
DISALLOW_COPY_AND_ASSIGN(UDPSocketClientImpl);
DISALLOW_COPY_AND_ASSIGN(UDPSocketReceiverImpl);
};
class UDPSocketAppTest : public test::ApplicationTestBase {
public:
UDPSocketAppTest() {}
UDPSocketAppTest() : receiver_binding_(&receiver_) {}
~UDPSocketAppTest() override {}
void SetUp() override {
......@@ -291,13 +327,13 @@ class UDPSocketAppTest : public test::ApplicationTestBase {
connection->ConnectToService(&network_service_);
network_service_->CreateUDPSocket(GetProxy(&socket_));
socket_.set_client(&receiver_);
}
protected:
NetworkServicePtr network_service_;
UDPSocketPtr socket_;
UDPSocketClientImpl receiver_;
UDPSocketReceiverImpl receiver_;
Binding<UDPSocketReceiver> receiver_binding_;
DISALLOW_COPY_AND_ASSIGN(UDPSocketAppTest);
};
......@@ -322,7 +358,7 @@ TEST_F(UDPSocketAppTest, Settings) {
callback3.WaitForResult();
EXPECT_NE(net::OK, callback3.result()->code);
TestCallbackWithAddress callback4;
TestCallbackWithAddressAndReceiver callback4;
socket_->Bind(GetLocalHostWithAnyPort(), callback4.callback());
callback4.WaitForResult();
EXPECT_EQ(net::OK, callback4.result()->code);
......@@ -356,18 +392,20 @@ TEST_F(UDPSocketAppTest, Settings) {
}
TEST_F(UDPSocketAppTest, TestReadWrite) {
TestCallbackWithAddress callback1;
TestCallbackWithAddressAndReceiver callback1;
socket_->Bind(GetLocalHostWithAnyPort(), callback1.callback());
callback1.WaitForResult();
ASSERT_EQ(net::OK, callback1.result()->code);
ASSERT_NE(0u, callback1.net_address()->ipv4->port);
receiver_binding_.Bind(callback1.receiver().Pass());
NetAddressPtr server_addr = callback1.net_address().Clone();
UDPSocketPtr client_socket;
network_service_->CreateUDPSocket(GetProxy(&client_socket));
TestCallbackWithAddress callback2;
TestCallbackWithAddressAndReceiver callback2;
client_socket->Bind(GetLocalHostWithAnyPort(), callback2.callback());
callback2.WaitForResult();
ASSERT_EQ(net::OK, callback2.result()->code);
......@@ -402,25 +440,29 @@ TEST_F(UDPSocketAppTest, TestReadWrite) {
}
TEST_F(UDPSocketAppTest, TestConnectedReadWrite) {
TestCallbackWithAddress callback1;
TestCallbackWithAddressAndReceiver callback1;
socket_->Bind(GetLocalHostWithAnyPort(), callback1.callback());
callback1.WaitForResult();
ASSERT_EQ(net::OK, callback1.result()->code);
ASSERT_NE(0u, callback1.net_address()->ipv4->port);
receiver_binding_.Bind(callback1.receiver().Pass());
NetAddressPtr server_addr = callback1.net_address().Clone();
UDPSocketPtr client_socket;
network_service_->CreateUDPSocket(GetProxy(&client_socket));
UDPSocketClientImpl client_socket_receiver;
client_socket.set_client(&client_socket_receiver);
TestCallbackWithAddress callback2;
TestCallbackWithAddressAndReceiver callback2;
client_socket->Connect(server_addr.Clone(), callback2.callback());
callback2.WaitForResult();
ASSERT_EQ(net::OK, callback2.result()->code);
ASSERT_NE(0u, callback2.net_address()->ipv4->port);
UDPSocketReceiverImpl client_socket_receiver;
Binding<UDPSocketReceiver> client_receiver_binding(
&client_socket_receiver, callback2.receiver().Pass());
NetAddressPtr client_addr = callback2.net_address().Clone();
const size_t kDatagramCount = 6;
......
......@@ -34,9 +34,12 @@ UDPSocketImpl::PendingSendRequest::PendingSendRequest() {}
UDPSocketImpl::PendingSendRequest::~PendingSendRequest() {}
UDPSocketImpl::UDPSocketImpl()
: socket_(net::DatagramSocket::DEFAULT_BIND, net::RandIntCallback(),
nullptr, net::NetLog::Source()),
UDPSocketImpl::UDPSocketImpl(InterfaceRequest<UDPSocket> request)
: binding_(this, request.Pass()),
socket_(net::DatagramSocket::DEFAULT_BIND,
net::RandIntCallback(),
nullptr,
net::NetLog::Source()),
state_(NOT_BOUND_OR_CONNECTED),
allow_address_reuse_(false),
remaining_recv_slots_(0),
......@@ -60,7 +63,9 @@ void UDPSocketImpl::AllowAddressReuse(
void UDPSocketImpl::Bind(
NetAddressPtr addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
const Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>& callback) {
int net_result = net::OK;
bool opened = false;
......@@ -98,7 +103,7 @@ void UDPSocketImpl::Bind(
state_ = BOUND;
callback.Run(MakeNetworkError(net_result),
NetAddress::From(bound_ip_end_point));
NetAddress::From(bound_ip_end_point), GetProxy(&receiver_));
if (remaining_recv_slots_ > 0) {
DCHECK(!recvfrom_buffer_.get());
......@@ -110,12 +115,14 @@ void UDPSocketImpl::Bind(
DCHECK(net_result != net::OK);
if (opened)
socket_.Close();
callback.Run(MakeNetworkError(net_result), nullptr);
callback.Run(MakeNetworkError(net_result), nullptr, nullptr);
}
void UDPSocketImpl::Connect(
NetAddressPtr remote_addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
const Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>& callback) {
int net_result = net::OK;
bool opened = false;
......@@ -147,7 +154,7 @@ void UDPSocketImpl::Connect(
state_ = CONNECTED;
callback.Run(MakeNetworkError(net_result),
NetAddress::From(local_ip_end_point));
NetAddress::From(local_ip_end_point), GetProxy(&receiver_));
if (remaining_recv_slots_ > 0) {
DCHECK(!recvfrom_buffer_.get());
......@@ -159,7 +166,7 @@ void UDPSocketImpl::Connect(
DCHECK(net_result != net::OK);
if (opened)
socket_.Close();
callback.Run(MakeNetworkError(net_result), nullptr);
callback.Run(MakeNetworkError(net_result), nullptr, nullptr);
}
void UDPSocketImpl::SetSendBufferSize(
......@@ -216,6 +223,8 @@ void UDPSocketImpl::NegotiateMaxPendingSendRequests(
}
void UDPSocketImpl::ReceiveMore(uint32_t datagram_number) {
if (!receiver_)
return;
if (datagram_number == 0)
return;
if (std::numeric_limits<size_t>::max() - remaining_recv_slots_ <
......@@ -264,6 +273,7 @@ void UDPSocketImpl::SendTo(NetAddressPtr dest_addr,
void UDPSocketImpl::DoRecvFrom() {
DCHECK(IsBoundOrConnected());
DCHECK(receiver_);
DCHECK(!recvfrom_buffer_.get());
DCHECK_GT(remaining_recv_slots_, 0u);
......@@ -337,9 +347,8 @@ void UDPSocketImpl::OnRecvFromCompleted(int net_result) {
}
recvfrom_buffer_ = nullptr;
client()->OnReceived(MakeNetworkError(net_result), net_address.Pass(),
array.Pass());
receiver_->OnReceived(MakeNetworkError(net_result), net_address.Pass(),
array.Pass());
DCHECK_GT(remaining_recv_slots_, 0u);
remaining_recv_slots_--;
if (remaining_recv_slots_ > 0)
......
......@@ -13,6 +13,7 @@
#include "net/base/ip_endpoint.h"
#include "net/udp/udp_socket.h"
#include "third_party/mojo/src/mojo/public/cpp/bindings/interface_impl.h"
#include "third_party/mojo/src/mojo/public/cpp/bindings/strong_binding.h"
namespace net {
class IOBuffer;
......@@ -21,22 +22,28 @@ class IOBufferWithSize;
namespace mojo {
class UDPSocketImpl : public InterfaceImpl<UDPSocket> {
class UDPSocketImpl : public UDPSocket {
public:
UDPSocketImpl();
// The lifetime of a new UDPSocketImpl is bound to the connection associated
// with |request|.
explicit UDPSocketImpl(InterfaceRequest<UDPSocket> request);
~UDPSocketImpl() override;
// UDPSocket implementation.
void AllowAddressReuse(
const Callback<void(NetworkErrorPtr)>& callback) override;
void Bind(
NetAddressPtr addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) override;
void Bind(NetAddressPtr addr,
const Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>& callback)
override;
void Connect(
NetAddressPtr remote_addr,
const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) override;
void Connect(NetAddressPtr remote_addr,
const Callback<void(NetworkErrorPtr,
NetAddressPtr,
InterfaceRequest<UDPSocketReceiver>)>&
callback) override;
void SetSendBufferSize(
uint32_t size,
......@@ -85,6 +92,8 @@ class UDPSocketImpl : public InterfaceImpl<UDPSocket> {
return state_ == BOUND || state_ == CONNECTED;
}
StrongBinding<UDPSocket> binding_;
net::UDPSocket socket_;
State state_;
......@@ -96,8 +105,12 @@ class UDPSocketImpl : public InterfaceImpl<UDPSocket> {
// Non-null when there is a pending SendTo operation on |socket_|.
scoped_refptr<net::IOBufferWithSize> sendto_buffer_;
// The address of the pending RecvFrom operation, if any.
net::IPEndPoint recvfrom_address_;
// The interface which gets data from fulfilled receive requests.
UDPSocketReceiverPtr receiver_;
// How many more packets the client side expects to receive.
size_t remaining_recv_slots_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment