Commit 4e8df86c authored by hclam@chromium.org's avatar hclam@chromium.org

Implement UdpTransport for Cast

This change is partially a cleanup and implementation of UdpTransport
for Cast. As the name suggests it is a UDP transport mechanism for
Cast using net::UdpSocket.

A behavior change is that a Cast receiver can have sender IP address
optional. Only after a packet is received that it can send packets. It
will reply only to that address and ignore packets coming from a
different address.

If a sender address is given to Cast receiver then there is no change.

BUG=331989

Review URL: https://codereview.chromium.org/125713002

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@243854 0039d316-1c4b-4281-b951-d872f2087c98
parent 306677bb
...@@ -97,6 +97,7 @@ ...@@ -97,6 +97,7 @@
'transport/rtp_sender/rtp_packetizer/rtp_packetizer_unittest.cc', 'transport/rtp_sender/rtp_packetizer/rtp_packetizer_unittest.cc',
'transport/rtp_sender/rtp_packetizer/test/rtp_header_parser.cc', 'transport/rtp_sender/rtp_packetizer/test/rtp_header_parser.cc',
'transport/rtp_sender/rtp_packetizer/test/rtp_header_parser.h', 'transport/rtp_sender/rtp_packetizer/test/rtp_header_parser.h',
'transport/transport/udp_transport_unittest.cc',
'video_receiver/video_decoder_unittest.cc', 'video_receiver/video_decoder_unittest.cc',
'video_receiver/video_receiver_unittest.cc', 'video_receiver/video_receiver_unittest.cc',
'video_sender/external_video_encoder_unittest.cc', 'video_sender/external_video_encoder_unittest.cc',
......
...@@ -22,7 +22,8 @@ ...@@ -22,7 +22,8 @@
#include "media/cast/cast_receiver.h" #include "media/cast/cast_receiver.h"
#include "media/cast/logging/logging_defines.h" #include "media/cast/logging/logging_defines.h"
#include "media/cast/test/utility/input_helper.h" #include "media/cast/test/utility/input_helper.h"
#include "media/cast/transport/transport/transport.h" #include "media/cast/transport/transport/udp_transport.h"
#include "net/base/net_util.h"
#if defined(OS_LINUX) #if defined(OS_LINUX)
#include "media/cast/test/linux_output_window.h" #include "media/cast/test/linux_output_window.h"
...@@ -31,9 +32,9 @@ ...@@ -31,9 +32,9 @@
namespace media { namespace media {
namespace cast { namespace cast {
// Settings chosen to match default sender settings. // Settings chosen to match default sender settings.
#define DEFAULT_SEND_PORT "2346" #define DEFAULT_SEND_PORT "0"
#define DEFAULT_RECEIVE_PORT "2344" #define DEFAULT_RECEIVE_PORT "2344"
#define DEFAULT_SEND_IP "127.0.0.1" #define DEFAULT_SEND_IP "0.0.0.0"
#define DEFAULT_RESTART "0" #define DEFAULT_RESTART "0"
#define DEFAULT_AUDIO_FEEDBACK_SSRC "1" #define DEFAULT_AUDIO_FEEDBACK_SSRC "1"
#define DEFAULT_AUDIO_INCOMING_SSRC "2" #define DEFAULT_AUDIO_INCOMING_SSRC "2"
...@@ -64,11 +65,11 @@ void GetPorts(int* tx_port, int* rx_port) { ...@@ -64,11 +65,11 @@ void GetPorts(int* tx_port, int* rx_port) {
} }
std::string GetIpAddress(const std::string display_text) { std::string GetIpAddress(const std::string display_text) {
test::InputBuilder input(display_text, DEFAULT_SEND_IP, test::InputBuilder input(display_text, DEFAULT_SEND_IP, INT_MIN, INT_MAX);
INT_MIN, INT_MAX);
std::string ip_address = input.GetStringInput(); std::string ip_address = input.GetStringInput();
// Ensure correct form: // Ensure IP address is either the default value or in correct form.
while (std::count(ip_address.begin(), ip_address.end(), '.') != 3) { while (ip_address != DEFAULT_SEND_IP &&
std::count(ip_address.begin(), ip_address.end(), '.') != 3) {
ip_address = input.GetStringInput(); ip_address = input.GetStringInput();
} }
return ip_address; return ip_address;
...@@ -216,10 +217,8 @@ int main(int argc, char** argv) { ...@@ -216,10 +217,8 @@ int main(int argc, char** argv) {
InitLogging(logging::LoggingSettings()); InitLogging(logging::LoggingSettings());
VLOG(1) << "Cast Receiver"; VLOG(1) << "Cast Receiver";
base::Thread main_thread("Cast main send thread");
base::Thread audio_thread("Cast audio decoder thread"); base::Thread audio_thread("Cast audio decoder thread");
base::Thread video_thread("Cast video decoder thread"); base::Thread video_thread("Cast video decoder thread");
main_thread.Start();
audio_thread.Start(); audio_thread.Start();
video_thread.Start(); video_thread.Start();
...@@ -227,12 +226,12 @@ int main(int argc, char** argv) { ...@@ -227,12 +226,12 @@ int main(int argc, char** argv) {
// Enable receiver side threads, and disable logging. // Enable receiver side threads, and disable logging.
scoped_refptr<media::cast::CastEnvironment> cast_environment(new scoped_refptr<media::cast::CastEnvironment> cast_environment(new
media::cast::CastEnvironment(&clock, media::cast::CastEnvironment(&clock,
main_thread.message_loop_proxy(), main_message_loop.message_loop_proxy(),
NULL, NULL,
audio_thread.message_loop_proxy(), audio_thread.message_loop_proxy(),
NULL, NULL,
video_thread.message_loop_proxy(), video_thread.message_loop_proxy(),
main_thread.message_loop_proxy(), main_message_loop.message_loop_proxy(),
media::cast::GetDefaultCastLoggingConfig())); media::cast::GetDefaultCastLoggingConfig()));
media::cast::AudioReceiverConfig audio_config = media::cast::AudioReceiverConfig audio_config =
...@@ -240,31 +239,51 @@ int main(int argc, char** argv) { ...@@ -240,31 +239,51 @@ int main(int argc, char** argv) {
media::cast::VideoReceiverConfig video_config = media::cast::VideoReceiverConfig video_config =
media::cast::GetVideoReceiverConfig(); media::cast::GetVideoReceiverConfig();
scoped_ptr<media::cast::transport::Transport> transport( int remote_port, local_port;
new media::cast::transport::Transport( media::cast::GetPorts(&remote_port, &local_port);
main_message_loop.message_loop_proxy())); if (!local_port) {
LOG(ERROR) << "Invalid local port.";
return 1;
}
std::string remote_ip_address = media::cast::GetIpAddress("Enter remote IP.");
std::string local_ip_address = media::cast::GetIpAddress("Enter local IP.");
net::IPAddressNumber remote_ip_number;
net::IPAddressNumber local_ip_number;
if (!net::ParseIPLiteralToNumber(remote_ip_address, &remote_ip_number)) {
LOG(ERROR) << "Invalid remote IP address.";
return 1;
}
if (!net::ParseIPLiteralToNumber(local_ip_address, &local_ip_number)) {
LOG(ERROR) << "Invalid local IP address.";
return 1;
}
net::IPEndPoint remote_end_point(remote_ip_number, remote_port);
net::IPEndPoint local_end_point(local_ip_number, local_port);
scoped_ptr<media::cast::transport::UdpTransport> transport(
new media::cast::transport::UdpTransport(
main_message_loop.message_loop_proxy(),
local_end_point,
remote_end_point));
scoped_ptr<media::cast::CastReceiver> cast_receiver( scoped_ptr<media::cast::CastReceiver> cast_receiver(
media::cast::CastReceiver::CreateCastReceiver( media::cast::CastReceiver::CreateCastReceiver(
cast_environment, cast_environment,
audio_config, audio_config,
video_config, video_config,
transport->packet_sender())); transport.get()));
media::cast::transport::PacketReceiver* packet_receiver = media::cast::transport::PacketReceiver* packet_receiver =
cast_receiver->packet_receiver(); cast_receiver->packet_receiver();
int send_to_port, receive_port; transport->StartReceiving(packet_receiver);
media::cast::GetPorts(&send_to_port, &receive_port);
std::string ip_address = media::cast::GetIpAddress("Enter destination IP.");
std::string local_ip_address = media::cast::GetIpAddress("Enter local IP.");
transport->SetLocalReceiver(packet_receiver, ip_address, local_ip_address,
receive_port);
transport->SetSendDestination(ip_address, send_to_port);
scoped_refptr<media::cast::ReceiveProcess> receive_process( scoped_refptr<media::cast::ReceiveProcess> receive_process(
new media::cast::ReceiveProcess(cast_receiver->frame_receiver())); new media::cast::ReceiveProcess(cast_receiver->frame_receiver()));
receive_process->Start(); receive_process->Start();
main_message_loop.Run(); main_message_loop.Run();
transport->StopReceiving();
return 0; return 0;
} }
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "media/cast/test/audio_utility.h" #include "media/cast/test/audio_utility.h"
#include "media/cast/test/utility/input_helper.h" #include "media/cast/test/utility/input_helper.h"
#include "media/cast/test/video_utility.h" #include "media/cast/test/video_utility.h"
#include "media/cast/transport/transport/transport.h" #include "media/cast/transport/transport/udp_transport.h"
#include "ui/gfx/size.h" #include "ui/gfx/size.h"
namespace media { namespace media {
...@@ -295,11 +295,9 @@ int main(int argc, char** argv) { ...@@ -295,11 +295,9 @@ int main(int argc, char** argv) {
base::AtExitManager at_exit; base::AtExitManager at_exit;
VLOG(1) << "Cast Sender"; VLOG(1) << "Cast Sender";
base::Thread test_thread("Cast sender test app thread"); base::Thread test_thread("Cast sender test app thread");
base::Thread main_thread("Cast main send thread");
base::Thread audio_thread("Cast audio encoder thread"); base::Thread audio_thread("Cast audio encoder thread");
base::Thread video_thread("Cast video encoder thread"); base::Thread video_thread("Cast video encoder thread");
test_thread.Start(); test_thread.Start();
main_thread.Start();
audio_thread.Start(); audio_thread.Start();
video_thread.Start(); video_thread.Start();
...@@ -310,12 +308,12 @@ int main(int argc, char** argv) { ...@@ -310,12 +308,12 @@ int main(int argc, char** argv) {
scoped_refptr<media::cast::CastEnvironment> cast_environment(new scoped_refptr<media::cast::CastEnvironment> cast_environment(new
media::cast::CastEnvironment( media::cast::CastEnvironment(
&clock, &clock,
main_thread.message_loop_proxy(), io_message_loop.message_loop_proxy(),
audio_thread.message_loop_proxy(), audio_thread.message_loop_proxy(),
NULL, NULL,
video_thread.message_loop_proxy(), video_thread.message_loop_proxy(),
NULL, NULL,
main_thread.message_loop_proxy(), io_message_loop.message_loop_proxy(),
media::cast::GetDefaultCastLoggingConfig())); media::cast::GetDefaultCastLoggingConfig()));
media::cast::AudioSenderConfig audio_config = media::cast::AudioSenderConfig audio_config =
...@@ -323,27 +321,39 @@ int main(int argc, char** argv) { ...@@ -323,27 +321,39 @@ int main(int argc, char** argv) {
media::cast::VideoSenderConfig video_config = media::cast::VideoSenderConfig video_config =
media::cast::GetVideoSenderConfig(); media::cast::GetVideoSenderConfig();
scoped_ptr<media::cast::transport::Transport> transport( int remote_port, local_port;
new media::cast::transport::Transport( media::cast::GetPorts(&remote_port, &local_port);
io_message_loop.message_loop_proxy()));
std::string remote_ip_address =
media::cast::GetIpAddress("Enter receiver IP.");
std::string local_ip_address = media::cast::GetIpAddress("Enter local IP.");
net::IPAddressNumber remote_ip_number;
net::IPAddressNumber local_ip_number;
if (!net::ParseIPLiteralToNumber(remote_ip_address, &remote_ip_number)) {
LOG(ERROR) << "Invalid remote IP address.";
return 1;
}
if (!net::ParseIPLiteralToNumber(local_ip_address, &local_ip_number)) {
LOG(ERROR) << "Invalid local IP address.";
return 1;
}
net::IPEndPoint remote_end_point(remote_ip_number, remote_port);
net::IPEndPoint local_end_point(local_ip_number, local_port);
scoped_ptr<media::cast::transport::UdpTransport> transport(
new media::cast::transport::UdpTransport(
io_message_loop.message_loop_proxy(),
local_end_point,
remote_end_point));
scoped_ptr<media::cast::CastSender> cast_sender( scoped_ptr<media::cast::CastSender> cast_sender(
media::cast::CastSender::CreateCastSender(cast_environment, media::cast::CastSender::CreateCastSender(cast_environment,
audio_config, audio_config,
video_config, video_config,
NULL, // VideoEncoderController. NULL, // VideoEncoderController.
transport->packet_sender())); transport.get()));
transport->StartReceiving(cast_sender->packet_receiver());
media::cast::transport::PacketReceiver* packet_receiver =
cast_sender->packet_receiver();
int send_to_port, receive_port;
media::cast::GetPorts(&send_to_port, &receive_port);
std::string ip_address = media::cast::GetIpAddress("Enter destination IP.");
std::string local_ip_address = media::cast::GetIpAddress("Enter local IP.");
transport->SetLocalReceiver(packet_receiver, ip_address, local_ip_address,
receive_port);
transport->SetSendDestination(ip_address, send_to_port);
media::cast::FrameInput* frame_input = cast_sender->frame_input(); media::cast::FrameInput* frame_input = cast_sender->frame_input();
scoped_ptr<media::cast::SendProcess> send_process(new scoped_ptr<media::cast::SendProcess> send_process(new
...@@ -354,6 +364,5 @@ int main(int argc, char** argv) { ...@@ -354,6 +364,5 @@ int main(int argc, char** argv) {
send_process->SendFrame(); send_process->SendFrame();
io_message_loop.Run(); io_message_loop.Run();
transport->StopReceiving();
return 0; return 0;
} }
...@@ -33,8 +33,8 @@ ...@@ -33,8 +33,8 @@
'rtp_sender/rtp_packetizer/rtp_packetizer.h', 'rtp_sender/rtp_packetizer/rtp_packetizer.h',
'rtp_sender/rtp_sender.cc', 'rtp_sender/rtp_sender.cc',
'rtp_sender/rtp_sender.h', 'rtp_sender/rtp_sender.h',
'transport/transport.cc', 'transport/udp_transport.cc',
'transport/transport.h', 'transport/udp_transport.h',
], # source ], # source
}, },
], # targets, ], # targets,
......
// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/cast/transport/transport/transport.h"
#include <string>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/rand_callback.h"
namespace media {
namespace cast {
namespace transport {
const int kMaxPacketSize = 1500;
class LocalUdpTransportData;
void CreateUDPAddress(std::string ip_str, int port, net::IPEndPoint* address) {
net::IPAddressNumber ip_number;
bool rv = net::ParseIPLiteralToNumber(ip_str, &ip_number);
if (!rv)
return;
*address = net::IPEndPoint(ip_number, port);
}
class LocalUdpTransportData
: public base::RefCountedThreadSafe<LocalUdpTransportData> {
public:
LocalUdpTransportData(net::UDPServerSocket* udp_socket,
scoped_refptr<base::TaskRunner> io_thread_proxy)
: udp_socket_(udp_socket),
buffer_(new net::IOBufferWithSize(kMaxPacketSize)),
io_thread_proxy_(io_thread_proxy) {
}
void ListenTo(net::IPEndPoint bind_address) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
bind_address_ = bind_address;
io_thread_proxy_->PostTask(FROM_HERE,
base::Bind(&LocalUdpTransportData::RecvFromSocketLoop, this));
}
void DeletePacket(uint8* data) {
// Should be called from the receiver (not on the transport thread).
DCHECK(!(io_thread_proxy_->RunsTasksOnCurrentThread()));
delete [] data;
}
void PacketReceived(int size) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
// Got a packet with length result.
uint8* data = new uint8[size];
memcpy(data, buffer_->data(), size);
packet_receiver_->ReceivedPacket(data, size,
base::Bind(&LocalUdpTransportData::DeletePacket, this, data));
RecvFromSocketLoop();
}
void RecvFromSocketLoop() {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
// Callback should always trigger with a packet.
int res = udp_socket_->RecvFrom(buffer_.get(), kMaxPacketSize,
&bind_address_, base::Bind(&LocalUdpTransportData::PacketReceived,
this));
DCHECK(res >= net::ERR_IO_PENDING);
if (res > 0) {
PacketReceived(res);
}
}
void set_packet_receiver(PacketReceiver* packet_receiver) {
packet_receiver_ = packet_receiver;
}
void Close() {
udp_socket_->Close();
}
protected:
virtual ~LocalUdpTransportData() {}
private:
friend class base::RefCountedThreadSafe<LocalUdpTransportData>;
net::UDPServerSocket* udp_socket_;
net::IPEndPoint bind_address_;
PacketReceiver* packet_receiver_;
scoped_refptr<net::IOBufferWithSize> buffer_;
scoped_refptr<base::TaskRunner> io_thread_proxy_;
DISALLOW_COPY_AND_ASSIGN(LocalUdpTransportData);
};
class LocalPacketSender : public PacketSender,
public base::RefCountedThreadSafe<LocalPacketSender> {
public:
LocalPacketSender(net::UDPServerSocket* udp_socket,
scoped_refptr<base::TaskRunner> io_thread_proxy)
: udp_socket_(udp_socket),
send_address_(),
io_thread_proxy_(io_thread_proxy) {}
virtual bool SendPacket(const Packet& packet) OVERRIDE {
io_thread_proxy_->PostTask(FROM_HERE,
base::Bind(&LocalPacketSender::SendPacketToNetwork, this, packet));
return true;
}
virtual void SendPacketToNetwork(const Packet& packet) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
const uint8* data = &packet[0];
scoped_refptr<net::WrappedIOBuffer> buffer(
new net::WrappedIOBuffer(reinterpret_cast<const char*>(data)));
udp_socket_->SendTo(buffer.get(), static_cast<int>(packet.size()),
send_address_,
base::Bind(&LocalPacketSender::OnSendCompleted,
base::Unretained(this)));
}
virtual void OnSendCompleted(int result) {
if (result < 0) {
// TODO(mikhal): Add to error messages.
VLOG(0) << "Send failed on UDP socket : " << result;
}
}
virtual bool SendPackets(const PacketList& packets) OVERRIDE {
bool out_val = true;
for (size_t i = 0; i < packets.size(); ++i) {
const Packet& packet = packets[i];
out_val |= SendPacket(packet);
}
return out_val;
}
void SetSendAddress(const net::IPEndPoint& send_address) {
send_address_ = send_address;
}
protected:
virtual ~LocalPacketSender() {}
private:
friend class base::RefCountedThreadSafe<LocalPacketSender>;
net::UDPServerSocket* udp_socket_; // Not owned by this class.
net::IPEndPoint send_address_;
scoped_refptr<base::TaskRunner> io_thread_proxy_;
};
Transport::Transport(
scoped_refptr<base::TaskRunner> io_thread_proxy)
: udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())),
local_udp_transport_data_(new LocalUdpTransportData(udp_socket_.get(),
io_thread_proxy)),
packet_sender_(new LocalPacketSender(udp_socket_.get(), io_thread_proxy)),
io_thread_proxy_(io_thread_proxy) {}
Transport::~Transport() {}
PacketSender* Transport::packet_sender() {
return static_cast<PacketSender*>(packet_sender_.get());
}
void Transport::StopReceiving() {
local_udp_transport_data_->Close();
}
void Transport::SetLocalReceiver(PacketReceiver* packet_receiver,
std::string ip_address,
std::string local_ip_address,
int port) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
net::IPEndPoint bind_address, local_bind_address;
CreateUDPAddress(ip_address, port, &bind_address);
CreateUDPAddress(local_ip_address, port, &local_bind_address);
local_udp_transport_data_->set_packet_receiver(packet_receiver);
udp_socket_->AllowAddressReuse();
udp_socket_->SetMulticastLoopbackMode(true);
udp_socket_->Listen(local_bind_address);
// Start listening once receiver has been set.
local_udp_transport_data_->ListenTo(bind_address);
}
void Transport::SetSendDestination(std::string ip_address, int port) {
net::IPEndPoint send_address;
CreateUDPAddress(ip_address, port, &send_address);
packet_sender_->SetSendAddress(send_address);
}
} // namespace transport
} // namespace cast
} // namespace media
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/cast/transport/transport/udp_transport.h"
#include <algorithm>
#include <string>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/rand_callback.h"
namespace media {
namespace cast {
namespace transport {
namespace {
const int kMaxPacketSize = 1500;
bool IsEmpty(const net::IPEndPoint& addr) {
net::IPAddressNumber empty_addr(addr.address().size());
return std::equal(empty_addr.begin(),
empty_addr.end(),
addr.address().begin());
}
bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
return addr1.port() == addr2.port() &&
std::equal(addr1.address().begin(),
addr1.address().end(),
addr2.address().begin());
}
} // namespace
UdpTransport::UdpTransport(
const scoped_refptr<base::TaskRunner>& io_thread_proxy,
const net::IPEndPoint& local_end_point,
const net::IPEndPoint& remote_end_point)
: io_thread_proxy_(io_thread_proxy),
local_addr_(local_end_point),
remote_addr_(remote_end_point),
udp_socket_(new net::UDPServerSocket(NULL, net::NetLog::Source())),
recv_buf_(new net::IOBuffer(kMaxPacketSize)),
packet_receiver_(NULL),
weak_factory_(this) {
}
UdpTransport::~UdpTransport() {
}
void UdpTransport::StartReceiving(PacketReceiver* packet_receiver) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
DCHECK(!packet_receiver_);
packet_receiver_ = packet_receiver;
udp_socket_->AllowAddressReuse();
udp_socket_->SetMulticastLoopbackMode(true);
udp_socket_->Listen(local_addr_);
ReceiveOnePacket();
}
void UdpTransport::ReceiveOnePacket() {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
int result = udp_socket_->RecvFrom(
recv_buf_,
kMaxPacketSize,
&recv_addr_,
base::Bind(&UdpTransport::OnReceived, weak_factory_.GetWeakPtr()));
if (result > 0) {
OnReceived(result);
} else if (result != net::ERR_IO_PENDING) {
LOG(ERROR) << "Failed to receive packet: " << result << "."
<< " Stop receiving packets.";
}
}
void UdpTransport::OnReceived(int result) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
if (result < 0) {
LOG(ERROR) << "Failed to receive packet: " << result << "."
<< " Stop receiving packets.";
return;
}
if (IsEmpty(remote_addr_)) {
remote_addr_ = recv_addr_;
VLOG(1) << "First packet received from: "
<< remote_addr_.ToString() << ".";
} else if (!IsEqual(remote_addr_, recv_addr_)) {
VLOG(1) << "Received from an unrecognized address: "
<< recv_addr_.ToString() << ".";
return;
}
// TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy.
uint8* data = new uint8[result];
memcpy(data, recv_buf_->data(), result);
packet_receiver_->ReceivedPacket(
data,
result,
base::Bind(&PacketReceiver::DeletePacket, data));
ReceiveOnePacket();
}
bool UdpTransport::SendPackets(const PacketList& packets) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
bool result = true;
for (size_t i = 0; i < packets.size(); ++i) {
result |= SendPacket(packets[i]);
}
return result;
}
bool UdpTransport::SendPacket(const Packet& packet) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
// TODO(hclam): This interface should take a net::IOBuffer to minimize
// memcpy.
scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(
static_cast<int>(packet.size()));
memcpy(buf->data(), &packet[0], packet.size());
int ret = udp_socket_->SendTo(
buf,
static_cast<int>(packet.size()),
remote_addr_,
base::Bind(&UdpTransport::OnSent,
weak_factory_.GetWeakPtr(), buf));
return ret == net::OK;
}
void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
int result) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
if (result < 0) {
VLOG(1) << "Failed to send packet: " << result << ".";
}
}
} // namespace transport
} // namespace cast
} // namespace media
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_CAST_TRANSPORT_TRANSPORT_UDP_TRANSPORT_H_
#define MEDIA_CAST_TRANSPORT_TRANSPORT_UDP_TRANSPORT_H_
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "media/cast/cast_config.h"
#include "media/cast/cast_environment.h"
#include "net/base/ip_endpoint.h"
#include "net/udp/udp_server_socket.h"
namespace net {
class IOBuffer;
class IPEndPoint;
} // namespace net
namespace media {
namespace cast {
namespace transport {
// This class implements UDP transport mechanism for Cast.
class UdpTransport : public PacketSender {
public:
// Construct a UDP transport.
// All methods must be called on |io_thread_proxy|.
// |local_end_point| specifies the address and port to bind and listen
// to incoming packets.
// |remote_end_point| specifies the address and port to send packets
// to. If the value is 0.0.0.0:0 the the end point is set to the source
// address of the first packet received.
UdpTransport(const scoped_refptr<base::TaskRunner>& io_thread_proxy,
const net::IPEndPoint& local_end_point,
const net::IPEndPoint& remote_end_point);
virtual ~UdpTransport();
// Start receiving packets. Packets are submitted to |packet_receiver|.
void StartReceiving(PacketReceiver* packet_receiver);
// PacketSender implementations.
virtual bool SendPackets(const PacketList& packets) OVERRIDE;
virtual bool SendPacket(const Packet& packet) OVERRIDE;
private:
void ReceiveOnePacket();
void OnReceived(int result);
void OnSent(const scoped_refptr<net::IOBuffer>& buf, int result);
scoped_refptr<base::TaskRunner> io_thread_proxy_;
net::IPEndPoint local_addr_;
net::IPEndPoint remote_addr_;
scoped_ptr<net::UDPServerSocket> udp_socket_;
scoped_refptr<net::IOBuffer> recv_buf_;
net::IPEndPoint recv_addr_;
PacketReceiver* packet_receiver_;
base::WeakPtrFactory<UdpTransport> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(UdpTransport);
};
} // namespace transport
} // namespace cast
} // namespace media
#endif // MEDIA_CAST_TRANSPORT_TRANSPORT_UDP_TRANSPORT_H_
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "media/cast/transport/transport/udp_transport.h"
#include <algorithm>
#include <string>
#include <vector>
#include "base/bind.h"
#include "base/callback.h"
#include "base/message_loop/message_loop.h"
#include "base/run_loop.h"
#include "media/cast/cast_config.h"
#include "net/base/net_util.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace media {
namespace cast {
namespace transport {
class MockPacketReceiver : public PacketReceiver {
public:
MockPacketReceiver(const base::Closure& callback)
: packet_callback_(callback) {
}
virtual void ReceivedPacket(const uint8* packet, size_t length,
const base::Closure callback) OVERRIDE {
packet_ = std::string(length, '\0');
std::copy(packet, packet + length, packet_.begin());
callback.Run();
packet_callback_.Run();
}
std::string packet() const { return packet_; }
private:
friend class base::RefCountedThreadSafe<PacketReceiver>;
virtual ~MockPacketReceiver() {}
std::string packet_;
base::Closure packet_callback_;
DISALLOW_COPY_AND_ASSIGN(MockPacketReceiver);
};
void SendPacket(UdpTransport* transport, Packet packet) {
transport->SendPacket(packet);
}
TEST(UdpTransport, SendAndReceive) {
base::MessageLoopForIO message_loop;
net::IPAddressNumber local_addr_number;
net::IPAddressNumber empty_addr_number;
net::ParseIPLiteralToNumber("127.0.0.1", &local_addr_number);
net::ParseIPLiteralToNumber("0.0.0.0", &empty_addr_number);
UdpTransport send_transport(message_loop.message_loop_proxy(),
net::IPEndPoint(local_addr_number, 2344),
net::IPEndPoint(local_addr_number, 2345));
UdpTransport recv_transport(message_loop.message_loop_proxy(),
net::IPEndPoint(local_addr_number, 2345),
net::IPEndPoint(empty_addr_number, 0));
Packet packet;
packet.push_back('t');
packet.push_back('e');
packet.push_back('s');
packet.push_back('t');
base::RunLoop run_loop;
scoped_refptr<MockPacketReceiver> receiver1 = new MockPacketReceiver(
run_loop.QuitClosure());
scoped_refptr<MockPacketReceiver> receiver2 = new MockPacketReceiver(
base::Bind(&SendPacket, &recv_transport, packet));
send_transport.StartReceiving(receiver1);
recv_transport.StartReceiving(receiver2);
send_transport.SendPacket(packet);
run_loop.Run();
EXPECT_TRUE(std::equal(packet.begin(),
packet.end(),
receiver1->packet().begin()));
EXPECT_TRUE(std::equal(packet.begin(),
packet.end(),
receiver2->packet().begin()));
}
} // namespace transport
} // namespace cast
} // namespace media
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment