Commit 449e2fc9 authored by kmarshall's avatar kmarshall Committed by Commit bot

Decouple Blimp transport output from BlimpConnections using MessagePort.

This seam allows us to reuse our existing network connection factories for
hypothetical future non-BlimpConnection connection objects.

Also modified/rewrote existing unit tests to accommodate new API.

R=wez@chromium.org,lethalantidote@chromium.org
BUG=635711

Review-Url: https://codereview.chromium.org/2236093002
Cr-Commit-Position: refs/heads/master@{#414245}
parent d8fe6fb4
...@@ -56,6 +56,8 @@ component("net") { ...@@ -56,6 +56,8 @@ component("net") {
"input_message_converter.h", "input_message_converter.h",
"input_message_generator.cc", "input_message_generator.cc",
"input_message_generator.h", "input_message_generator.h",
"message_port.cc",
"message_port.h",
"null_blimp_message_processor.cc", "null_blimp_message_processor.cc",
"null_blimp_message_processor.h", "null_blimp_message_processor.h",
"ssl_client_transport.cc", "ssl_client_transport.cc",
...@@ -64,8 +66,6 @@ component("net") { ...@@ -64,8 +66,6 @@ component("net") {
"stream_packet_reader.h", "stream_packet_reader.h",
"stream_packet_writer.cc", "stream_packet_writer.cc",
"stream_packet_writer.h", "stream_packet_writer.h",
"stream_socket_connection.cc",
"stream_socket_connection.h",
"tcp_client_transport.cc", "tcp_client_transport.cc",
"tcp_client_transport.h", "tcp_client_transport.h",
"tcp_engine_transport.cc", "tcp_engine_transport.cc",
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include "blimp/net/blimp_connection.h" #include "blimp/net/blimp_connection.h"
#include <utility>
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/logging.h" #include "base/logging.h"
#include "base/macros.h" #include "base/macros.h"
...@@ -15,7 +17,7 @@ ...@@ -15,7 +17,7 @@
#include "blimp/net/blimp_message_pump.h" #include "blimp/net/blimp_message_pump.h"
#include "blimp/net/common.h" #include "blimp/net/common.h"
#include "blimp/net/connection_error_observer.h" #include "blimp/net/connection_error_observer.h"
#include "blimp/net/packet_reader.h" #include "blimp/net/message_port.h"
#include "blimp/net/packet_writer.h" #include "blimp/net/packet_writer.h"
#include "net/base/completion_callback.h" #include "net/base/completion_callback.h"
...@@ -150,16 +152,11 @@ void BlimpConnection::EndConnectionFilter::ProcessMessage( ...@@ -150,16 +152,11 @@ void BlimpConnection::EndConnectionFilter::ProcessMessage(
message_handler_->ProcessMessage(std::move(message), callback); message_handler_->ProcessMessage(std::move(message), callback);
} }
BlimpConnection::BlimpConnection(std::unique_ptr<PacketReader> reader, BlimpConnection::BlimpConnection(std::unique_ptr<MessagePort> message_port)
std::unique_ptr<PacketWriter> writer) : message_port_(std::move(message_port)),
: reader_(std::move(reader)), message_pump_(new BlimpMessagePump(message_port_->reader())),
message_pump_(new BlimpMessagePump(reader_.get())), outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())),
writer_(std::move(writer)),
outgoing_msg_processor_(new BlimpMessageSender(writer_.get())),
end_connection_filter_(new EndConnectionFilter(this)) { end_connection_filter_(new EndConnectionFilter(this)) {
DCHECK(writer_);
DCHECK(reader_);
message_pump_->set_error_observer(this); message_pump_->set_error_observer(this);
outgoing_msg_processor_->set_error_observer(this); outgoing_msg_processor_->set_error_observer(this);
} }
......
...@@ -17,16 +17,13 @@ namespace blimp { ...@@ -17,16 +17,13 @@ namespace blimp {
class BlimpMessageProcessor; class BlimpMessageProcessor;
class BlimpMessagePump; class BlimpMessagePump;
class BlimpMessageSender; class BlimpMessageSender;
class PacketReader; class MessagePort;
class PacketWriter;
// Encapsulates the state and logic used to exchange BlimpMessages over // Encapsulates the state and logic used to exchange BlimpMessages over
// a network connection. // a network connection.
class BLIMP_NET_EXPORT BlimpConnection : public ConnectionErrorObserver { class BLIMP_NET_EXPORT BlimpConnection : public ConnectionErrorObserver {
public: public:
BlimpConnection(std::unique_ptr<PacketReader> reader, explicit BlimpConnection(std::unique_ptr<MessagePort> message_port);
std::unique_ptr<PacketWriter> writer);
~BlimpConnection() override; ~BlimpConnection() override;
// Adds |observer| to the connection's error observer list. // Adds |observer| to the connection's error observer list.
...@@ -53,9 +50,8 @@ class BLIMP_NET_EXPORT BlimpConnection : public ConnectionErrorObserver { ...@@ -53,9 +50,8 @@ class BLIMP_NET_EXPORT BlimpConnection : public ConnectionErrorObserver {
void OnConnectionError(int error) override; void OnConnectionError(int error) override;
private: private:
std::unique_ptr<PacketReader> reader_; std::unique_ptr<MessagePort> message_port_;
std::unique_ptr<BlimpMessagePump> message_pump_; std::unique_ptr<BlimpMessagePump> message_pump_;
std::unique_ptr<PacketWriter> writer_;
std::unique_ptr<BlimpMessageSender> outgoing_msg_processor_; std::unique_ptr<BlimpMessageSender> outgoing_msg_processor_;
base::ObserverList<ConnectionErrorObserver> error_observers_; base::ObserverList<ConnectionErrorObserver> error_observers_;
std::unique_ptr<EndConnectionFilter> end_connection_filter_; std::unique_ptr<EndConnectionFilter> end_connection_filter_;
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
#include <utility>
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
...@@ -15,6 +16,7 @@ ...@@ -15,6 +16,7 @@
#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/common.h" #include "blimp/net/common.h"
#include "blimp/net/connection_error_observer.h" #include "blimp/net/connection_error_observer.h"
#include "blimp/net/message_port.h"
#include "blimp/net/test_common.h" #include "blimp/net/test_common.h"
#include "net/base/completion_callback.h" #include "net/base/completion_callback.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
...@@ -34,14 +36,14 @@ namespace { ...@@ -34,14 +36,14 @@ namespace {
class BlimpConnectionTest : public testing::Test { class BlimpConnectionTest : public testing::Test {
public: public:
BlimpConnectionTest() { BlimpConnectionTest() {
std::unique_ptr<testing::StrictMock<MockPacketWriter>> writer( std::unique_ptr<MockPacketReader> mock_reader(new MockPacketReader);
new testing::StrictMock<MockPacketWriter>); std::unique_ptr<MockPacketWriter> mock_writer(new MockPacketWriter);
writer_ = writer.get(); mock_reader_ = mock_reader.get();
std::unique_ptr<testing::StrictMock<MockPacketReader>> reader( mock_writer_ = mock_writer.get();
new testing::StrictMock<MockPacketReader>); connection_ =
reader_ = reader.get(); base::MakeUnique<BlimpConnection>(base::MakeUnique<MessagePort>(
connection_.reset( std::move(mock_reader), std::move(mock_writer)));
new BlimpConnection(std::move(reader), std::move(writer)));
connection_->AddConnectionErrorObserver(&error_observer1_); connection_->AddConnectionErrorObserver(&error_observer1_);
connection_->AddConnectionErrorObserver(&error_observer2_); connection_->AddConnectionErrorObserver(&error_observer2_);
connection_->AddConnectionErrorObserver(&error_observer3_); connection_->AddConnectionErrorObserver(&error_observer3_);
...@@ -64,8 +66,8 @@ class BlimpConnectionTest : public testing::Test { ...@@ -64,8 +66,8 @@ class BlimpConnectionTest : public testing::Test {
} }
base::MessageLoop message_loop_; base::MessageLoop message_loop_;
testing::StrictMock<MockPacketReader>* reader_; MockPacketReader* mock_reader_;
testing::StrictMock<MockPacketWriter>* writer_; MockPacketWriter* mock_writer_;
testing::StrictMock<MockConnectionErrorObserver> error_observer1_; testing::StrictMock<MockConnectionErrorObserver> error_observer1_;
testing::StrictMock<MockConnectionErrorObserver> error_observer2_; testing::StrictMock<MockConnectionErrorObserver> error_observer2_;
...@@ -82,11 +84,11 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) { ...@@ -82,11 +84,11 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWrite) {
net::CompletionCallback write_packet_cb; net::CompletionCallback write_packet_cb;
InSequence s; InSequence s;
EXPECT_CALL(*writer_, EXPECT_CALL(*mock_writer_,
WritePacket(BufferEqualsProto(*CreateInputMessage()), _)) WritePacket(BufferEqualsProto(*CreateInputMessage()), _))
.WillOnce(SaveArg<1>(&write_packet_cb)) .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation(); .RetiresOnSaturation();
EXPECT_CALL(*writer_, EXPECT_CALL(*mock_writer_,
WritePacket(BufferEqualsProto(*CreateControlMessage()), _)) WritePacket(BufferEqualsProto(*CreateControlMessage()), _))
.WillOnce(SaveArg<1>(&write_packet_cb)) .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation(); .RetiresOnSaturation();
...@@ -118,11 +120,11 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) { ...@@ -118,11 +120,11 @@ TEST_F(BlimpConnectionTest, AsyncTwoPacketsWriteWithError) {
net::CompletionCallback write_packet_cb; net::CompletionCallback write_packet_cb;
InSequence s; InSequence s;
EXPECT_CALL(*writer_, EXPECT_CALL(*mock_writer_,
WritePacket(BufferEqualsProto(*CreateInputMessage()), _)) WritePacket(BufferEqualsProto(*CreateInputMessage()), _))
.WillOnce(SaveArg<1>(&write_packet_cb)) .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation(); .RetiresOnSaturation();
EXPECT_CALL(*writer_, EXPECT_CALL(*mock_writer_,
WritePacket(BufferEqualsProto(*CreateControlMessage()), _)) WritePacket(BufferEqualsProto(*CreateControlMessage()), _))
.WillOnce(SaveArg<1>(&write_packet_cb)) .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation(); .RetiresOnSaturation();
...@@ -148,7 +150,7 @@ TEST_F(BlimpConnectionTest, DeleteHappyObserversAreOK) { ...@@ -148,7 +150,7 @@ TEST_F(BlimpConnectionTest, DeleteHappyObserversAreOK) {
net::CompletionCallback write_packet_cb; net::CompletionCallback write_packet_cb;
InSequence s; InSequence s;
EXPECT_CALL(*writer_, EXPECT_CALL(*mock_writer_,
WritePacket(BufferEqualsProto(*CreateInputMessage()), _)) WritePacket(BufferEqualsProto(*CreateInputMessage()), _))
.WillOnce(SaveArg<1>(&write_packet_cb)) .WillOnce(SaveArg<1>(&write_packet_cb))
.RetiresOnSaturation(); .RetiresOnSaturation();
...@@ -168,7 +170,7 @@ TEST_F(BlimpConnectionTest, ReadPacketErrorInvokesErrorObservers) { ...@@ -168,7 +170,7 @@ TEST_F(BlimpConnectionTest, ReadPacketErrorInvokesErrorObservers) {
scoped_refptr<net::GrowableIOBuffer> read_packet_buffer; scoped_refptr<net::GrowableIOBuffer> read_packet_buffer;
net::CompletionCallback read_packet_cb; net::CompletionCallback read_packet_cb;
EXPECT_CALL(*reader_, ReadPacket(_, _)) EXPECT_CALL(*mock_reader_, ReadPacket(_, _))
.WillOnce( .WillOnce(
DoAll(SaveArg<0>(&read_packet_buffer), SaveArg<1>(&read_packet_cb))) DoAll(SaveArg<0>(&read_packet_buffer), SaveArg<1>(&read_packet_cb)))
.RetiresOnSaturation(); .RetiresOnSaturation();
...@@ -194,7 +196,7 @@ TEST_F(BlimpConnectionTest, EndConnectionInvokesErrorObservers) { ...@@ -194,7 +196,7 @@ TEST_F(BlimpConnectionTest, EndConnectionInvokesErrorObservers) {
scoped_refptr<net::GrowableIOBuffer> read_packet_buffer; scoped_refptr<net::GrowableIOBuffer> read_packet_buffer;
net::CompletionCallback read_packet_cb; net::CompletionCallback read_packet_cb;
EXPECT_CALL(*reader_, ReadPacket(_, _)) EXPECT_CALL(*mock_reader_, ReadPacket(_, _))
.WillOnce( .WillOnce(
DoAll(SaveArg<0>(&read_packet_buffer), SaveArg<1>(&read_packet_cb))) DoAll(SaveArg<0>(&read_packet_buffer), SaveArg<1>(&read_packet_cb)))
.WillOnce(Return()) .WillOnce(Return())
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
namespace blimp { namespace blimp {
class BlimpConnection; class BlimpConnection;
class MessagePort;
// An interface which encapsulates the transport-specific code for // An interface which encapsulates the transport-specific code for
// establishing network connections between the client and engine. // establishing network connections between the client and engine.
...@@ -24,17 +25,16 @@ class BlimpTransport { ...@@ -24,17 +25,16 @@ class BlimpTransport {
// Initiate or listen for a connection. // Initiate or listen for a connection.
// //
// |callback| will be invoked with the connection outcome: // |callback| is passed net::OK if a connection was successfully
// * net::OK if a connection is established successful, the BlimpConnection // established. The connection's MessagePort can then be taken by calling
// can be taken by calling TakeConnection(). // TakeMessagePort().
// * net::ERR_IO_PENDING will never be the outcome // All other values indicate a connection error.
// * Other error code indicates no connection was established.
virtual void Connect(const net::CompletionCallback& callback) = 0; virtual void Connect(const net::CompletionCallback& callback) = 0;
// Returns the connection object after a successful Connect(). // Returns the MessagePort of a successfully established connection.
virtual std::unique_ptr<BlimpConnection> TakeConnection() = 0; virtual std::unique_ptr<MessagePort> TakeMessagePort() = 0;
// Gets transport name, e.g. "TCP", "SSL", "mock", etc. // Gets the transport name, e.g. "TCP", "SSL", "mock", etc.
virtual const char* GetName() const = 0; virtual const char* GetName() const = 0;
}; };
......
...@@ -4,7 +4,10 @@ ...@@ -4,7 +4,10 @@
#include "blimp/net/client_connection_manager.h" #include "blimp/net/client_connection_manager.h"
#include <utility>
#include "base/logging.h" #include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "blimp/common/create_blimp_message.h" #include "blimp/common/create_blimp_message.h"
#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/common/protocol_version.h" #include "blimp/common/protocol_version.h"
...@@ -13,6 +16,7 @@ ...@@ -13,6 +16,7 @@
#include "blimp/net/blimp_transport.h" #include "blimp/net/blimp_transport.h"
#include "blimp/net/browser_connection_handler.h" #include "blimp/net/browser_connection_handler.h"
#include "blimp/net/connection_handler.h" #include "blimp/net/connection_handler.h"
#include "blimp/net/message_port.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
namespace blimp { namespace blimp {
...@@ -54,7 +58,8 @@ void ClientConnectionManager::OnConnectResult(int transport_index, int result) { ...@@ -54,7 +58,8 @@ void ClientConnectionManager::OnConnectResult(int transport_index, int result) {
DCHECK_NE(result, net::ERR_IO_PENDING); DCHECK_NE(result, net::ERR_IO_PENDING);
const auto& transport = transports_[transport_index]; const auto& transport = transports_[transport_index];
if (result == net::OK) { if (result == net::OK) {
std::unique_ptr<BlimpConnection> connection = transport->TakeConnection(); std::unique_ptr<BlimpConnection> connection =
base::MakeUnique<BlimpConnection>(transport->TakeMessagePort());
connection->AddConnectionErrorObserver(this); connection->AddConnectionErrorObserver(this);
SendAuthenticationMessage(std::move(connection)); SendAuthenticationMessage(std::move(connection));
} else { } else {
......
...@@ -5,8 +5,8 @@ ...@@ -5,8 +5,8 @@
#include "blimp/net/client_connection_manager.h" #include "blimp/net/client_connection_manager.h"
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
#include <utility>
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
...@@ -14,8 +14,6 @@ ...@@ -14,8 +14,6 @@
#include "blimp/common/create_blimp_message.h" #include "blimp/common/create_blimp_message.h"
#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/common/protocol_version.h" #include "blimp/common/protocol_version.h"
#include "blimp/net/blimp_connection.h"
#include "blimp/net/blimp_transport.h"
#include "blimp/net/test_common.h" #include "blimp/net/test_common.h"
#include "net/base/completion_callback.h" #include "net/base/completion_callback.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
...@@ -39,10 +37,8 @@ class ClientConnectionManagerTest : public testing::Test { ...@@ -39,10 +37,8 @@ class ClientConnectionManagerTest : public testing::Test {
: manager_(new ClientConnectionManager(&connection_handler_)), : manager_(new ClientConnectionManager(&connection_handler_)),
transport1_(new testing::StrictMock<MockTransport>), transport1_(new testing::StrictMock<MockTransport>),
transport2_(new testing::StrictMock<MockTransport>), transport2_(new testing::StrictMock<MockTransport>),
reader_(new MockPacketReader), reader_(new testing::StrictMock<MockPacketReader>),
writer_(new MockPacketWriter), writer_(new testing::StrictMock<MockPacketWriter>),
connection_(new BlimpConnection(base::WrapUnique(reader_),
base::WrapUnique(writer_))),
start_connection_message_( start_connection_message_(
CreateStartConnectionMessage(kDummyClientToken, kProtocolVersion)) { CreateStartConnectionMessage(kDummyClientToken, kProtocolVersion)) {
manager_->set_client_token(kDummyClientToken); manager_->set_client_token(kDummyClientToken);
...@@ -56,9 +52,8 @@ class ClientConnectionManagerTest : public testing::Test { ...@@ -56,9 +52,8 @@ class ClientConnectionManagerTest : public testing::Test {
std::unique_ptr<ClientConnectionManager> manager_; std::unique_ptr<ClientConnectionManager> manager_;
std::unique_ptr<testing::StrictMock<MockTransport>> transport1_; std::unique_ptr<testing::StrictMock<MockTransport>> transport1_;
std::unique_ptr<testing::StrictMock<MockTransport>> transport2_; std::unique_ptr<testing::StrictMock<MockTransport>> transport2_;
MockPacketReader* reader_; std::unique_ptr<MockPacketReader> reader_;
MockPacketWriter* writer_; std::unique_ptr<MockPacketWriter> writer_;
std::unique_ptr<BlimpConnection> connection_;
std::unique_ptr<BlimpMessage> start_connection_message_; std::unique_ptr<BlimpMessage> start_connection_message_;
}; };
...@@ -67,18 +62,20 @@ TEST_F(ClientConnectionManagerTest, FirstTransportConnects) { ...@@ -67,18 +62,20 @@ TEST_F(ClientConnectionManagerTest, FirstTransportConnects) {
net::CompletionCallback write_cb; net::CompletionCallback write_cb;
net::CompletionCallback connect_cb_1; net::CompletionCallback connect_cb_1;
EXPECT_CALL(*transport1_, Connect(_)).WillOnce(SaveArg<0>(&connect_cb_1)); EXPECT_CALL(*transport1_, Connect(_)).WillOnce(SaveArg<0>(&connect_cb_1));
EXPECT_CALL(connection_handler_, HandleConnectionPtr(Eq(connection_.get()))); EXPECT_CALL(connection_handler_, HandleConnectionPtr(_));
EXPECT_CALL(*writer_, EXPECT_CALL(*writer_,
WritePacket(BufferEqualsProto(*start_connection_message_), _)) WritePacket(BufferEqualsProto(*start_connection_message_), _))
.WillOnce(SaveArg<1>(&write_cb)); .WillOnce(SaveArg<1>(&write_cb));
EXPECT_CALL(*transport1_, TakeConnectionPtr())
.WillOnce(Return(connection_.release()));
ASSERT_TRUE(connect_cb_1.is_null()); EXPECT_CALL(*transport1_, TakeMessagePortPtr())
.WillOnce(
Return(new MessagePort(std::move(reader_), std::move(writer_))));
EXPECT_TRUE(connect_cb_1.is_null());
manager_->AddTransport(std::move(transport1_)); manager_->AddTransport(std::move(transport1_));
manager_->AddTransport(std::move(transport2_)); manager_->AddTransport(std::move(transport2_));
manager_->Connect(); manager_->Connect();
ASSERT_FALSE(connect_cb_1.is_null()); EXPECT_FALSE(connect_cb_1.is_null());
base::ResetAndReturn(&connect_cb_1).Run(net::OK); base::ResetAndReturn(&connect_cb_1).Run(net::OK);
base::ResetAndReturn(&write_cb).Run(net::OK); base::ResetAndReturn(&write_cb).Run(net::OK);
} }
...@@ -93,18 +90,19 @@ TEST_F(ClientConnectionManagerTest, SecondTransportConnects) { ...@@ -93,18 +90,19 @@ TEST_F(ClientConnectionManagerTest, SecondTransportConnects) {
EXPECT_CALL(*writer_, EXPECT_CALL(*writer_,
WritePacket(BufferEqualsProto(*start_connection_message_), _)) WritePacket(BufferEqualsProto(*start_connection_message_), _))
.WillOnce(SaveArg<1>(&write_cb)); .WillOnce(SaveArg<1>(&write_cb));
EXPECT_CALL(connection_handler_, HandleConnectionPtr(Eq(connection_.get()))); EXPECT_CALL(connection_handler_, HandleConnectionPtr(_));
EXPECT_CALL(*transport2_, TakeConnectionPtr()) EXPECT_CALL(*transport2_, TakeMessagePortPtr())
.WillOnce(Return(connection_.release())); .WillOnce(
Return(new MessagePort(std::move(reader_), std::move(writer_))));
ASSERT_TRUE(connect_cb_1.is_null()); EXPECT_TRUE(connect_cb_1.is_null());
ASSERT_TRUE(connect_cb_2.is_null()); EXPECT_TRUE(connect_cb_2.is_null());
manager_->AddTransport(std::move(transport1_)); manager_->AddTransport(std::move(transport1_));
manager_->AddTransport(std::move(transport2_)); manager_->AddTransport(std::move(transport2_));
manager_->Connect(); manager_->Connect();
ASSERT_FALSE(connect_cb_1.is_null()); EXPECT_FALSE(connect_cb_1.is_null());
base::ResetAndReturn(&connect_cb_1).Run(net::ERR_FAILED); base::ResetAndReturn(&connect_cb_1).Run(net::ERR_FAILED);
ASSERT_FALSE(connect_cb_2.is_null()); EXPECT_FALSE(connect_cb_2.is_null());
base::ResetAndReturn(&connect_cb_2).Run(net::OK); base::ResetAndReturn(&connect_cb_2).Run(net::OK);
base::ResetAndReturn(&write_cb).Run(net::OK); base::ResetAndReturn(&write_cb).Run(net::OK);
} }
...@@ -116,15 +114,15 @@ TEST_F(ClientConnectionManagerTest, BothTransportsFailToConnect) { ...@@ -116,15 +114,15 @@ TEST_F(ClientConnectionManagerTest, BothTransportsFailToConnect) {
net::CompletionCallback connect_cb_2; net::CompletionCallback connect_cb_2;
EXPECT_CALL(*transport2_, Connect(_)).WillOnce(SaveArg<0>(&connect_cb_2)); EXPECT_CALL(*transport2_, Connect(_)).WillOnce(SaveArg<0>(&connect_cb_2));
ASSERT_TRUE(connect_cb_1.is_null()); EXPECT_TRUE(connect_cb_1.is_null());
ASSERT_TRUE(connect_cb_2.is_null()); EXPECT_TRUE(connect_cb_2.is_null());
manager_->AddTransport(std::move(transport1_)); manager_->AddTransport(std::move(transport1_));
manager_->AddTransport(std::move(transport2_)); manager_->AddTransport(std::move(transport2_));
manager_->Connect(); manager_->Connect();
ASSERT_FALSE(connect_cb_1.is_null()); EXPECT_FALSE(connect_cb_1.is_null());
ASSERT_TRUE(connect_cb_2.is_null()); EXPECT_TRUE(connect_cb_2.is_null());
base::ResetAndReturn(&connect_cb_1).Run(net::ERR_FAILED); base::ResetAndReturn(&connect_cb_1).Run(net::ERR_FAILED);
ASSERT_FALSE(connect_cb_2.is_null()); EXPECT_FALSE(connect_cb_2.is_null());
base::ResetAndReturn(&connect_cb_2).Run(net::ERR_FAILED); base::ResetAndReturn(&connect_cb_2).Run(net::ERR_FAILED);
} }
......
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
// found in the LICENSE file. // found in the LICENSE file.
#include <stddef.h> #include <stddef.h>
#include <memory>
#include <string> #include <string>
#include <utility>
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/test/test_mock_time_task_runner.h" #include "base/test/test_mock_time_task_runner.h"
......
...@@ -4,9 +4,13 @@ ...@@ -4,9 +4,13 @@
#include "blimp/net/engine_connection_manager.h" #include "blimp/net/engine_connection_manager.h"
#include <utility>
#include "base/logging.h" #include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "blimp/net/blimp_connection.h" #include "blimp/net/blimp_connection.h"
#include "blimp/net/blimp_transport.h" #include "blimp/net/blimp_transport.h"
#include "blimp/net/message_port.h"
#include "net/base/net_errors.h" #include "net/base/net_errors.h"
namespace blimp { namespace blimp {
...@@ -34,9 +38,9 @@ void EngineConnectionManager::Connect(BlimpTransport* transport) { ...@@ -34,9 +38,9 @@ void EngineConnectionManager::Connect(BlimpTransport* transport) {
void EngineConnectionManager::OnConnectResult(BlimpTransport* transport, void EngineConnectionManager::OnConnectResult(BlimpTransport* transport,
int result) { int result) {
// Expects engine transport to be reliably, thus |result| is always net::OK. CHECK_EQ(net::OK, result) << "Transport failure:" << transport->GetName();
CHECK(result == net::OK) << "Transport failure:" << transport->GetName(); connection_handler_->HandleConnection(
connection_handler_->HandleConnection(transport->TakeConnection()); base::MakeUnique<BlimpConnection>(transport->TakeMessagePort()));
Connect(transport); Connect(transport);
} }
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <stddef.h> #include <stddef.h>
#include <string> #include <string>
#include <utility>
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
...@@ -40,39 +41,42 @@ class EngineConnectionManagerTest : public testing::Test { ...@@ -40,39 +41,42 @@ class EngineConnectionManagerTest : public testing::Test {
std::unique_ptr<BlimpConnection> CreateConnection() { std::unique_ptr<BlimpConnection> CreateConnection() {
return base::MakeUnique<BlimpConnection>( return base::MakeUnique<BlimpConnection>(
base::WrapUnique(new MockPacketReader), base::MakeUnique<MessagePort>(base::MakeUnique<MockPacketReader>(),
base::WrapUnique(new MockPacketWriter)); base::MakeUnique<MockPacketWriter>()));
} }
protected: protected:
base::MessageLoopForIO message_loop_;
testing::StrictMock<MockConnectionHandler> connection_handler_; testing::StrictMock<MockConnectionHandler> connection_handler_;
std::unique_ptr<EngineConnectionManager> manager_; std::unique_ptr<EngineConnectionManager> manager_;
}; };
TEST_F(EngineConnectionManagerTest, ConnectionSucceeds) { TEST_F(EngineConnectionManagerTest, ConnectionSucceeds) {
std::unique_ptr<testing::StrictMock<MockTransport>> transport1( std::unique_ptr<testing::StrictMock<MockTransport>> transport1(
new testing::StrictMock<MockTransport>); new testing::StrictMock<MockTransport>());
std::unique_ptr<testing::StrictMock<MockTransport>> transport2( std::unique_ptr<testing::StrictMock<MockTransport>> transport2(
new testing::StrictMock<MockTransport>); new testing::StrictMock<MockTransport>());
std::unique_ptr<BlimpConnection> connection1 = CreateConnection();
net::CompletionCallback connect_cb_1; net::CompletionCallback connect_cb_1;
net::CompletionCallback connect_cb_2;
EXPECT_CALL(*transport1, Connect(_)) EXPECT_CALL(*transport1, Connect(_))
.Times(2) .Times(2)
.WillRepeatedly(SaveArg<0>(&connect_cb_1)); .WillRepeatedly(SaveArg<0>(&connect_cb_1));
EXPECT_CALL(connection_handler_, HandleConnectionPtr(Eq(connection1.get())));
EXPECT_CALL(*transport1, TakeConnectionPtr())
.WillOnce(Return(connection1.release()));
std::unique_ptr<BlimpConnection> connection2 = CreateConnection();
net::CompletionCallback connect_cb_2;
EXPECT_CALL(*transport2, Connect(_)) EXPECT_CALL(*transport2, Connect(_))
.Times(2) .Times(2)
.WillRepeatedly(SaveArg<0>(&connect_cb_2)); .WillRepeatedly(SaveArg<0>(&connect_cb_2));
EXPECT_CALL(connection_handler_, HandleConnectionPtr(Eq(connection2.get())));
EXPECT_CALL(*transport2, TakeConnectionPtr()) std::unique_ptr<MessagePort> port1 =
.WillOnce(Return(connection2.release())); base::MakeUnique<MessagePort>(base::MakeUnique<MockPacketReader>(),
base::MakeUnique<MockPacketWriter>());
std::unique_ptr<MessagePort> port2 =
base::MakeUnique<MessagePort>(base::MakeUnique<MockPacketReader>(),
base::MakeUnique<MockPacketWriter>());
EXPECT_CALL(connection_handler_, HandleConnectionPtr(_)).Times(2);
EXPECT_CALL(*transport1, TakeMessagePortPtr())
.WillOnce(Return(port1.release()));
EXPECT_CALL(*transport2, TakeMessagePortPtr())
.WillOnce(Return(port2.release()));
ASSERT_TRUE(connect_cb_1.is_null()); ASSERT_TRUE(connect_cb_1.is_null());
manager_->AddTransport(std::move(transport1)); manager_->AddTransport(std::move(transport1));
......
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "blimp/net/message_port.h"
#include <utility>
#include "base/memory/ptr_util.h"
#include "blimp/net/compressed_packet_reader.h"
#include "blimp/net/compressed_packet_writer.h"
#include "blimp/net/packet_reader.h"
#include "blimp/net/packet_writer.h"
#include "blimp/net/stream_packet_reader.h"
#include "blimp/net/stream_packet_writer.h"
#include "net/socket/stream_socket.h"
namespace blimp {
namespace {
class CompressedStreamSocketMessagePort : public MessagePort {
public:
explicit CompressedStreamSocketMessagePort(
std::unique_ptr<net::StreamSocket> socket);
~CompressedStreamSocketMessagePort() override {}
private:
std::unique_ptr<net::StreamSocket> socket_;
DISALLOW_COPY_AND_ASSIGN(CompressedStreamSocketMessagePort);
};
CompressedStreamSocketMessagePort::CompressedStreamSocketMessagePort(
std::unique_ptr<net::StreamSocket> socket)
: MessagePort(base::MakeUnique<CompressedPacketReader>(
base::MakeUnique<StreamPacketReader>(socket.get())),
base::MakeUnique<CompressedPacketWriter>(
base::MakeUnique<StreamPacketWriter>(socket.get()))),
socket_(std::move(socket)) {}
} // namespace
// static
std::unique_ptr<MessagePort> MessagePort::CreateForStreamSocketWithCompression(
std::unique_ptr<net::StreamSocket> socket) {
return base::MakeUnique<CompressedStreamSocketMessagePort>(std::move(socket));
}
MessagePort::MessagePort(std::unique_ptr<PacketReader> reader,
std::unique_ptr<PacketWriter> writer)
: reader_(std::move(reader)), writer_(std::move(writer)) {}
MessagePort::~MessagePort() {}
} // namespace blimp
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BLIMP_NET_MESSAGE_PORT_H_
#define BLIMP_NET_MESSAGE_PORT_H_
#include <memory>
#include "base/macros.h"
#include "base/supports_user_data.h"
#include "blimp/net/blimp_net_export.h"
namespace net {
class StreamSocket;
}
namespace blimp {
class PacketReader;
class PacketWriter;
// A duplexed pair of a framed reader and writer.
class BLIMP_NET_EXPORT MessagePort {
public:
MessagePort(std::unique_ptr<PacketReader> reader,
std::unique_ptr<PacketWriter> writer);
virtual ~MessagePort();
static std::unique_ptr<MessagePort> CreateForStreamSocketWithCompression(
std::unique_ptr<net::StreamSocket> socket);
PacketReader* reader() const { return reader_.get(); }
PacketWriter* writer() const { return writer_.get(); }
private:
std::unique_ptr<PacketReader> reader_;
std::unique_ptr<PacketWriter> writer_;
DISALLOW_COPY_AND_ASSIGN(MessagePort);
};
} // namespace blimp
#endif // BLIMP_NET_MESSAGE_PORT_H_
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "base/run_loop.h" #include "base/run_loop.h"
#include "blimp/net/blimp_connection.h" #include "blimp/net/blimp_connection.h"
#include "blimp/net/message_port.h"
#include "blimp/net/ssl_client_transport.h" #include "blimp/net/ssl_client_transport.h"
#include "net/base/address_list.h" #include "net/base/address_list.h"
#include "net/base/ip_address.h" #include "net/base/ip_address.h"
...@@ -81,7 +82,7 @@ TEST_F(SSLClientTransportTest, ConnectSyncOK) { ...@@ -81,7 +82,7 @@ TEST_F(SSLClientTransportTest, ConnectSyncOK) {
SetupSSLSyncSocketConnect(net::OK); SetupSSLSyncSocketConnect(net::OK);
transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete, transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete,
base::Unretained(this))); base::Unretained(this)));
EXPECT_NE(nullptr, transport_->TakeConnection().get()); EXPECT_NE(nullptr, transport_->TakeMessagePort().get());
base::RunLoop().RunUntilIdle(); base::RunLoop().RunUntilIdle();
} }
} }
...@@ -96,7 +97,7 @@ TEST_F(SSLClientTransportTest, ConnectAsyncOK) { ...@@ -96,7 +97,7 @@ TEST_F(SSLClientTransportTest, ConnectAsyncOK) {
transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete, transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete,
base::Unretained(this))); base::Unretained(this)));
base::RunLoop().RunUntilIdle(); base::RunLoop().RunUntilIdle();
EXPECT_NE(nullptr, transport_->TakeConnection().get()); EXPECT_NE(nullptr, transport_->TakeMessagePort().get());
} }
} }
...@@ -155,7 +156,7 @@ TEST_F(SSLClientTransportTest, ConnectAfterError) { ...@@ -155,7 +156,7 @@ TEST_F(SSLClientTransportTest, ConnectAfterError) {
SetupSSLSyncSocketConnect(net::OK); SetupSSLSyncSocketConnect(net::OK);
transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete, transport_->Connect(base::Bind(&SSLClientTransportTest::ConnectComplete,
base::Unretained(this))); base::Unretained(this)));
EXPECT_NE(nullptr, transport_->TakeConnection().get()); EXPECT_NE(nullptr, transport_->TakeMessagePort().get());
base::RunLoop().RunUntilIdle(); base::RunLoop().RunUntilIdle();
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h" #include "base/message_loop/message_loop.h"
#include "blimp/net/message_port.h"
#include "blimp/net/stream_socket_connection.h" #include "blimp/net/stream_socket_connection.h"
#include "net/socket/client_socket_factory.h" #include "net/socket/client_socket_factory.h"
#include "net/socket/stream_socket.h" #include "net/socket/stream_socket.h"
...@@ -50,10 +51,10 @@ void TCPClientTransport::Connect(const net::CompletionCallback& callback) { ...@@ -50,10 +51,10 @@ void TCPClientTransport::Connect(const net::CompletionCallback& callback) {
OnTCPConnectComplete(result); OnTCPConnectComplete(result);
} }
std::unique_ptr<BlimpConnection> TCPClientTransport::TakeConnection() { std::unique_ptr<MessagePort> TCPClientTransport::TakeMessagePort() {
DCHECK(connect_callback_.is_null()); DCHECK(connect_callback_.is_null());
DCHECK(socket_); DCHECK(socket_);
return base::MakeUnique<StreamSocketConnection>(std::move(socket_)); return MessagePort::CreateForStreamSocketWithCompression(std::move(socket_));
} }
const char* TCPClientTransport::GetName() const { const char* TCPClientTransport::GetName() const {
......
...@@ -23,7 +23,7 @@ class StreamSocket; ...@@ -23,7 +23,7 @@ class StreamSocket;
namespace blimp { namespace blimp {
class BlimpConnection; class MessagePort;
// BlimpTransport which creates a TCP connection to one of the specified // BlimpTransport which creates a TCP connection to one of the specified
// |addresses| on each call to Connect(). // |addresses| on each call to Connect().
...@@ -37,7 +37,7 @@ class BLIMP_NET_EXPORT TCPClientTransport : public BlimpTransport { ...@@ -37,7 +37,7 @@ class BLIMP_NET_EXPORT TCPClientTransport : public BlimpTransport {
// BlimpTransport implementation. // BlimpTransport implementation.
void Connect(const net::CompletionCallback& callback) override; void Connect(const net::CompletionCallback& callback) override;
std::unique_ptr<BlimpConnection> TakeConnection() override; std::unique_ptr<MessagePort> TakeMessagePort() override;
const char* GetName() const override; const char* GetName() const override;
protected: protected:
......
...@@ -11,9 +11,8 @@ ...@@ -11,9 +11,8 @@
#include "base/callback_helpers.h" #include "base/callback_helpers.h"
#include "base/location.h" #include "base/location.h"
#include "base/memory/ptr_util.h" #include "base/memory/ptr_util.h"
#include "base/single_thread_task_runner.h"
#include "base/threading/thread_task_runner_handle.h" #include "base/threading/thread_task_runner_handle.h"
#include "blimp/net/stream_socket_connection.h" #include "blimp/net/message_port.h"
#include "net/socket/stream_socket.h" #include "net/socket/stream_socket.h"
#include "net/socket/tcp_server_socket.h" #include "net/socket/tcp_server_socket.h"
...@@ -51,7 +50,6 @@ void TCPEngineTransport::Connect(const net::CompletionCallback& callback) { ...@@ -51,7 +50,6 @@ void TCPEngineTransport::Connect(const net::CompletionCallback& callback) {
} }
if (result != net::OK) { if (result != net::OK) {
// TODO(haibinlu): investigate when we can keep using this server socket.
server_socket_.reset(); server_socket_.reset();
} }
...@@ -59,10 +57,11 @@ void TCPEngineTransport::Connect(const net::CompletionCallback& callback) { ...@@ -59,10 +57,11 @@ void TCPEngineTransport::Connect(const net::CompletionCallback& callback) {
base::Bind(callback, result)); base::Bind(callback, result));
} }
std::unique_ptr<BlimpConnection> TCPEngineTransport::TakeConnection() { std::unique_ptr<MessagePort> TCPEngineTransport::TakeMessagePort() {
DCHECK(connect_callback_.is_null()); DCHECK(connect_callback_.is_null());
DCHECK(accepted_socket_); DCHECK(accepted_socket_);
return base::MakeUnique<StreamSocketConnection>(std::move(accepted_socket_)); return MessagePort::CreateForStreamSocketWithCompression(
std::move(accepted_socket_));
} }
const char* TCPEngineTransport::GetName() const { const char* TCPEngineTransport::GetName() const {
......
...@@ -22,7 +22,7 @@ class StreamSocket; ...@@ -22,7 +22,7 @@ class StreamSocket;
namespace blimp { namespace blimp {
class BlimpConnection; class MessagePort;
// BlimpTransport which listens for a TCP connection at |address|. // BlimpTransport which listens for a TCP connection at |address|.
class BLIMP_NET_EXPORT TCPEngineTransport : public BlimpTransport { class BLIMP_NET_EXPORT TCPEngineTransport : public BlimpTransport {
...@@ -34,7 +34,7 @@ class BLIMP_NET_EXPORT TCPEngineTransport : public BlimpTransport { ...@@ -34,7 +34,7 @@ class BLIMP_NET_EXPORT TCPEngineTransport : public BlimpTransport {
// BlimpTransport implementation. // BlimpTransport implementation.
void Connect(const net::CompletionCallback& callback) override; void Connect(const net::CompletionCallback& callback) override;
std::unique_ptr<BlimpConnection> TakeConnection() override; std::unique_ptr<MessagePort> TakeMessagePort() override;
const char* GetName() const override; const char* GetName() const override;
int GetLocalAddress(net::IPEndPoint* address) const; int GetLocalAddress(net::IPEndPoint* address) const;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include <algorithm>
#include <memory> #include <memory>
#include <string> #include <string>
...@@ -31,24 +32,34 @@ namespace { ...@@ -31,24 +32,34 @@ namespace {
// Integration test for TCPEngineTransport and TCPClientTransport. // Integration test for TCPEngineTransport and TCPClientTransport.
class TCPTransportTest : public testing::Test { class TCPTransportTest : public testing::Test {
protected: protected:
TCPTransportTest() { TCPTransportTest()
net::IPEndPoint local_address(net::IPAddress(127, 0, 0, 1), 0); : local_address_(net::IPAddress(127, 0, 0, 1), 0),
engine_.reset(new TCPEngineTransport(local_address, nullptr)); engine_(local_address_, nullptr),
read_buffer_(new net::GrowableIOBuffer) {
size_t buf_size = std::max(payload_1_.size(), payload_2_.size());
write_buffer_ = make_scoped_refptr(
new net::DrainableIOBuffer(new net::IOBuffer(buf_size), buf_size));
read_buffer_->SetCapacity(buf_size);
} }
net::IPEndPoint GetLocalEndpoint() const { net::IPEndPoint GetLocalEndpoint() const {
net::IPEndPoint local_address; net::IPEndPoint local_address;
CHECK_EQ(net::OK, engine_->GetLocalAddress(&local_address)); CHECK_EQ(net::OK, engine_.GetLocalAddress(&local_address));
return local_address; return local_address;
} }
std::string payload_1_ = "foo";
std::string payload_2_ = "bar";
base::MessageLoopForIO message_loop_; base::MessageLoopForIO message_loop_;
std::unique_ptr<TCPEngineTransport> engine_; net::IPEndPoint local_address_;
TCPEngineTransport engine_;
scoped_refptr<net::DrainableIOBuffer> write_buffer_;
scoped_refptr<net::GrowableIOBuffer> read_buffer_;
}; };
TEST_F(TCPTransportTest, Connect) { TEST_F(TCPTransportTest, Connect) {
net::TestCompletionCallback accept_callback; net::TestCompletionCallback accept_callback;
engine_->Connect(accept_callback.callback()); engine_.Connect(accept_callback.callback());
net::TestCompletionCallback connect_callback; net::TestCompletionCallback connect_callback;
TCPClientTransport client(GetLocalEndpoint(), nullptr); TCPClientTransport client(GetLocalEndpoint(), nullptr);
...@@ -56,90 +67,67 @@ TEST_F(TCPTransportTest, Connect) { ...@@ -56,90 +67,67 @@ TEST_F(TCPTransportTest, Connect) {
EXPECT_EQ(net::OK, connect_callback.WaitForResult()); EXPECT_EQ(net::OK, connect_callback.WaitForResult());
EXPECT_EQ(net::OK, accept_callback.WaitForResult()); EXPECT_EQ(net::OK, accept_callback.WaitForResult());
EXPECT_TRUE(engine_->TakeConnection() != nullptr); EXPECT_NE(nullptr, client.TakeMessagePort());
} }
TEST_F(TCPTransportTest, TwoClientConnections) { TEST_F(TCPTransportTest, TwoClientConnections) {
net::TestCompletionCallback accept_callback1; net::TestCompletionCallback accept_callback1;
engine_->Connect(accept_callback1.callback()); engine_.Connect(accept_callback1.callback());
net::TestCompletionCallback connect_callback1; net::TestCompletionCallback connect_callback1;
TCPClientTransport client1(GetLocalEndpoint(), nullptr); TCPClientTransport client(GetLocalEndpoint(), nullptr);
client1.Connect(connect_callback1.callback()); client.Connect(connect_callback1.callback());
net::TestCompletionCallback connect_callback2;
TCPClientTransport client2(GetLocalEndpoint(), nullptr);
client2.Connect(connect_callback2.callback());
EXPECT_EQ(net::OK, connect_callback1.WaitForResult()); EXPECT_EQ(net::OK, connect_callback1.WaitForResult());
EXPECT_EQ(net::OK, accept_callback1.WaitForResult()); EXPECT_EQ(net::OK, accept_callback1.WaitForResult());
EXPECT_TRUE(engine_->TakeConnection() != nullptr); EXPECT_NE(nullptr, engine_.TakeMessagePort());
net::TestCompletionCallback accept_callback2; net::TestCompletionCallback accept_callback2;
engine_->Connect(accept_callback2.callback()); engine_.Connect(accept_callback2.callback());
net::TestCompletionCallback connect_callback2;
TCPClientTransport client2(GetLocalEndpoint(), nullptr);
client2.Connect(connect_callback2.callback());
EXPECT_EQ(net::OK, connect_callback2.WaitForResult()); EXPECT_EQ(net::OK, connect_callback2.WaitForResult());
EXPECT_EQ(net::OK, accept_callback2.WaitForResult()); EXPECT_EQ(net::OK, accept_callback2.WaitForResult());
EXPECT_TRUE(engine_->TakeConnection() != nullptr); EXPECT_NE(nullptr, engine_.TakeMessagePort());
} }
TEST_F(TCPTransportTest, ExchangeMessages) { TEST_F(TCPTransportTest, ExchangeMessages) {
// Start the Engine transport and connect a client to it. // Start the Engine transport and connect a client to it.
net::TestCompletionCallback accept_callback; net::TestCompletionCallback accept_callback;
engine_->Connect(accept_callback.callback()); engine_.Connect(accept_callback.callback());
net::TestCompletionCallback client_connect_callback; net::TestCompletionCallback client_connect_callback;
TCPClientTransport client(GetLocalEndpoint(), nullptr); TCPClientTransport client(GetLocalEndpoint(), nullptr);
client.Connect(client_connect_callback.callback()); client.Connect(client_connect_callback.callback());
EXPECT_EQ(net::OK, client_connect_callback.WaitForResult());
EXPECT_EQ(net::OK, accept_callback.WaitForResult()); EXPECT_EQ(net::OK, accept_callback.WaitForResult());
EXPECT_EQ(net::OK, client_connect_callback.WaitForResult());
// Expect the engine to get two messages from the client, and the client to std::unique_ptr<MessagePort> engine_message_port = engine_.TakeMessagePort();
// get one from the engine. std::unique_ptr<MessagePort> clientmessage_port = client.TakeMessagePort();
MockBlimpMessageProcessor engine_incoming_processor;
MockBlimpMessageProcessor client_incoming_processor; // Engine sends payload_1_ to client.
net::CompletionCallback engine_process_message_cb; net::TestCompletionCallback read_cb1;
std::unique_ptr<BlimpMessage> client_message1 = net::TestCompletionCallback write_cb1;
CreateStartConnectionMessage("", 0); memcpy(write_buffer_->data(), payload_1_.data(), payload_1_.size());
int client_message1_size = client_message1->ByteSize(); engine_message_port->writer()->WritePacket(write_buffer_,
std::unique_ptr<BlimpMessage> client_message2 = CreateCheckpointAckMessage(5); write_cb1.callback());
std::unique_ptr<BlimpMessage> engine_message = CreateCheckpointAckMessage(10); clientmessage_port->reader()->ReadPacket(read_buffer_, read_cb1.callback());
EXPECT_CALL(engine_incoming_processor, EXPECT_EQ(payload_1_.size(), static_cast<size_t>(read_cb1.WaitForResult()));
MockableProcessMessage(EqualsProto(*client_message1), _)) EXPECT_EQ(net::OK, write_cb1.WaitForResult());
.WillOnce(SaveArg<1>(&engine_process_message_cb)); EXPECT_TRUE(
EXPECT_CALL(engine_incoming_processor, BufferStartsWith(read_buffer_.get(), payload_1_.size(), payload_1_));
MockableProcessMessage(EqualsProto(*client_message2), _))
.Times(1); // Client sends payload_2_ to engine.
EXPECT_CALL(client_incoming_processor, net::TestCompletionCallback read_cb2;
MockableProcessMessage(EqualsProto(*engine_message), _)) net::TestCompletionCallback write_cb2;
.Times(1); memcpy(write_buffer_->data(), payload_2_.data(), payload_2_.size());
clientmessage_port->writer()->WritePacket(write_buffer_,
// Attach the ends of the connection to our mock message-processors. write_cb2.callback());
std::unique_ptr<BlimpConnection> engine_connnection = engine_message_port->reader()->ReadPacket(read_buffer_, read_cb2.callback());
engine_->TakeConnection(); EXPECT_EQ(payload_2_.size(), static_cast<size_t>(read_cb2.WaitForResult()));
std::unique_ptr<BlimpConnection> client_connnection = client.TakeConnection(); EXPECT_EQ(net::OK, write_cb2.WaitForResult());
engine_connnection->SetIncomingMessageProcessor(&engine_incoming_processor); EXPECT_TRUE(
client_connnection->SetIncomingMessageProcessor(&client_incoming_processor); BufferStartsWith(read_buffer_.get(), payload_2_.size(), payload_2_));
// Client sends the first message.
net::TestCompletionCallback client_send_callback1;
client_connnection->GetOutgoingMessageProcessor()->ProcessMessage(
std::move(client_message1), client_send_callback1.callback());
EXPECT_EQ(net::OK, client_send_callback1.WaitForResult());
// Engine finishes processing the client message.
EXPECT_FALSE(engine_process_message_cb.is_null());
engine_process_message_cb.Run(client_message1_size);
// Engine sends one message.
net::TestCompletionCallback engine_send_callback;
engine_connnection->GetOutgoingMessageProcessor()->ProcessMessage(
std::move(engine_message), engine_send_callback.callback());
EXPECT_EQ(net::OK, engine_send_callback.WaitForResult());
// Client sends the second message.
net::TestCompletionCallback client_send_callback2;
client_connnection->GetOutgoingMessageProcessor()->ProcessMessage(
std::move(client_message2), client_send_callback2.callback());
EXPECT_EQ(net::OK, client_send_callback2.WaitForResult());
} }
} // namespace } // namespace
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include "blimp/common/proto/blimp_message.pb.h" #include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/blimp_connection.h" #include "blimp/net/blimp_connection.h"
#include "blimp/net/common.h" #include "blimp/net/common.h"
#include "blimp/net/message_port.h"
#include "net/base/io_buffer.h" #include "net/base/io_buffer.h"
namespace blimp { namespace blimp {
...@@ -23,8 +24,8 @@ MockTransport::MockTransport() {} ...@@ -23,8 +24,8 @@ MockTransport::MockTransport() {}
MockTransport::~MockTransport() {} MockTransport::~MockTransport() {}
std::unique_ptr<BlimpConnection> MockTransport::TakeConnection() { std::unique_ptr<MessagePort> MockTransport::TakeMessagePort() {
return base::WrapUnique(TakeConnectionPtr()); return base::WrapUnique(TakeMessagePortPtr());
} }
const char* MockTransport::GetName() const { const char* MockTransport::GetName() const {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "blimp/net/blimp_transport.h" #include "blimp/net/blimp_transport.h"
#include "blimp/net/connection_error_observer.h" #include "blimp/net/connection_error_observer.h"
#include "blimp/net/connection_handler.h" #include "blimp/net/connection_handler.h"
#include "blimp/net/message_port.h"
#include "blimp/net/packet_reader.h" #include "blimp/net/packet_reader.h"
#include "blimp/net/packet_writer.h" #include "blimp/net/packet_writer.h"
#include "net/socket/stream_socket.h" #include "net/socket/stream_socket.h"
...@@ -139,9 +140,9 @@ class MockTransport : public BlimpTransport { ...@@ -139,9 +140,9 @@ class MockTransport : public BlimpTransport {
~MockTransport() override; ~MockTransport() override;
MOCK_METHOD1(Connect, void(const net::CompletionCallback& callback)); MOCK_METHOD1(Connect, void(const net::CompletionCallback& callback));
MOCK_METHOD0(TakeConnectionPtr, BlimpConnection*()); MOCK_METHOD0(TakeMessagePortPtr, MessagePort*());
std::unique_ptr<BlimpConnection> TakeConnection() override; std::unique_ptr<MessagePort> TakeMessagePort() override;
const char* GetName() const override; const char* GetName() const override;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment