Commit 7e018b36 authored by Robert Ogden's avatar Robert Ogden Committed by Commit Bot

Expose the closing socket from EmbeddedTestServer on response completed

Allows the connection listener to take ownership of a connection's
socket once a response has finished being written.

This is being exposed to allow the EmbeddedTestServer to act as a
CONNECT proxy tunnel for testing the IsolatedPrerender feature (see
chained CL).

Bug: 1067300
Change-Id: I47d627262f96a50b09f90a82d140456a797901c5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2148038
Commit-Queue: Robert Ogden <robertogden@chromium.org>
Reviewed-by: default avatarTarun Bansal <tbansal@chromium.org>
Reviewed-by: default avatarMatt Menke <mmenke@chromium.org>
Cr-Commit-Position: refs/heads/master@{#761118}
parent 84f651f5
...@@ -39,6 +39,9 @@ void EmbeddedTestServerAndroid::ConnectionListener::ReadFromSocket( ...@@ -39,6 +39,9 @@ void EmbeddedTestServerAndroid::ConnectionListener::ReadFromSocket(
test_server_android_->ReadFromSocket(static_cast<const void*>(&socket)); test_server_android_->ReadFromSocket(static_cast<const void*>(&socket));
} }
void EmbeddedTestServerAndroid::ConnectionListener::
OnResponseCompletedSuccessfully(std::unique_ptr<StreamSocket> socket) {}
EmbeddedTestServerAndroid::EmbeddedTestServerAndroid( EmbeddedTestServerAndroid::EmbeddedTestServerAndroid(
JNIEnv* env, JNIEnv* env,
const JavaRef<jobject>& jobj, const JavaRef<jobject>& jobj,
......
...@@ -79,6 +79,8 @@ class EmbeddedTestServerAndroid { ...@@ -79,6 +79,8 @@ class EmbeddedTestServerAndroid {
std::unique_ptr<StreamSocket> AcceptedSocket( std::unique_ptr<StreamSocket> AcceptedSocket(
std::unique_ptr<StreamSocket> socket) override; std::unique_ptr<StreamSocket> socket) override;
void ReadFromSocket(const StreamSocket& socket, int rv) override; void ReadFromSocket(const StreamSocket& socket, int rv) override;
void OnResponseCompletedSuccessfully(
std::unique_ptr<StreamSocket> socket) override;
private: private:
EmbeddedTestServerAndroid* test_server_android_; EmbeddedTestServerAndroid* test_server_android_;
......
...@@ -388,8 +388,8 @@ void EmbeddedTestServer::HandleRequest(HttpConnection* connection, ...@@ -388,8 +388,8 @@ void EmbeddedTestServer::HandleRequest(HttpConnection* connection,
response->SendResponse( response->SendResponse(
base::BindRepeating(&HttpConnection::SendResponseBytes, base::BindRepeating(&HttpConnection::SendResponseBytes,
connection->GetWeakPtr()), connection->GetWeakPtr()),
base::BindOnce(&EmbeddedTestServer::DidClose, weak_factory_.GetWeakPtr(), base::BindOnce(&EmbeddedTestServer::OnResponseCompleted,
connection)); weak_factory_.GetWeakPtr(), connection));
} }
GURL EmbeddedTestServer::GetURL(const std::string& relative_url) const { GURL EmbeddedTestServer::GetURL(const std::string& relative_url) const {
...@@ -642,6 +642,22 @@ bool EmbeddedTestServer::HandleReadResult(HttpConnection* connection, int rv) { ...@@ -642,6 +642,22 @@ bool EmbeddedTestServer::HandleReadResult(HttpConnection* connection, int rv) {
return true; return true;
} }
void EmbeddedTestServer::OnResponseCompleted(HttpConnection* connection) {
DCHECK(io_thread_->task_runner()->BelongsToCurrentThread());
DCHECK(connection);
DCHECK_EQ(1u, connections_.count(connection->socket_.get()));
std::unique_ptr<StreamSocket> socket = std::move(connection->socket_);
connections_.erase(socket.get());
// |connection| is now invalid, don't use it again.
// Only allow the connection listener to take the socket if it is still open.
if (socket->IsConnected() && connection_listener_) {
connection_listener_->OnResponseCompletedSuccessfully(std::move(socket));
}
}
void EmbeddedTestServer::DidClose(HttpConnection* connection) { void EmbeddedTestServer::DidClose(HttpConnection* connection) {
DCHECK(io_thread_->task_runner()->BelongsToCurrentThread()); DCHECK(io_thread_->task_runner()->BelongsToCurrentThread());
DCHECK(connection); DCHECK(connection);
......
...@@ -307,6 +307,11 @@ class EmbeddedTestServer { ...@@ -307,6 +307,11 @@ class EmbeddedTestServer {
// request has been received. // request has been received.
bool HandleReadResult(HttpConnection* connection, int rv); bool HandleReadResult(HttpConnection* connection, int rv);
// Called when |connection| is finished writing the response and the socket
// can be closed, allowing for |connnection_listener_| to take it if the
// socket is still open.
void OnResponseCompleted(HttpConnection* connection);
// Closes and removes the connection upon error or completion. // Closes and removes the connection upon error or completion.
void DidClose(HttpConnection* connection); void DidClose(HttpConnection* connection);
......
...@@ -24,6 +24,13 @@ class EmbeddedTestServerConnectionListener { ...@@ -24,6 +24,13 @@ class EmbeddedTestServerConnectionListener {
// Notified when a socket was read from by the EmbeddedTestServer. // Notified when a socket was read from by the EmbeddedTestServer.
virtual void ReadFromSocket(const StreamSocket& socket, int rv) = 0; virtual void ReadFromSocket(const StreamSocket& socket, int rv) = 0;
// Notified when the EmbeddedTestServer has completed a request and response
// successfully on |socket|. The listener can take |socket| to manually handle
// further traffic on it (for example, if doing a proxy tunnel), otherwise
// |socket| is destroyed.
virtual void OnResponseCompletedSuccessfully(
std::unique_ptr<StreamSocket> socket) {}
protected: protected:
EmbeddedTestServerConnectionListener() {} EmbeddedTestServerConnectionListener() {}
......
...@@ -76,6 +76,7 @@ class TestConnectionListener ...@@ -76,6 +76,7 @@ class TestConnectionListener
TestConnectionListener() TestConnectionListener()
: socket_accepted_count_(0), : socket_accepted_count_(0),
did_read_from_socket_(false), did_read_from_socket_(false),
did_get_socket_on_complete_(false),
task_runner_(base::ThreadTaskRunnerHandle::Get()) {} task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
~TestConnectionListener() override = default; ~TestConnectionListener() override = default;
...@@ -97,6 +98,12 @@ class TestConnectionListener ...@@ -97,6 +98,12 @@ class TestConnectionListener
did_read_from_socket_ = true; did_read_from_socket_ = true;
} }
void OnResponseCompletedSuccessfully(
std::unique_ptr<StreamSocket> socket) override {
base::AutoLock lock(lock_);
did_get_socket_on_complete_ = socket && socket->IsConnected();
}
void WaitUntilFirstConnectionAccepted() { accept_loop_.Run(); } void WaitUntilFirstConnectionAccepted() { accept_loop_.Run(); }
size_t SocketAcceptedCount() const { size_t SocketAcceptedCount() const {
...@@ -109,9 +116,15 @@ class TestConnectionListener ...@@ -109,9 +116,15 @@ class TestConnectionListener
return did_read_from_socket_; return did_read_from_socket_;
} }
bool DidGetSocketOnComplete() const {
base::AutoLock lock(lock_);
return did_get_socket_on_complete_;
}
private: private:
size_t socket_accepted_count_; size_t socket_accepted_count_;
bool did_read_from_socket_; bool did_read_from_socket_;
bool did_get_socket_on_complete_;
base::RunLoop accept_loop_; base::RunLoop accept_loop_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_; scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
...@@ -329,6 +342,7 @@ TEST_P(EmbeddedTestServerTest, ConnectionListenerAccept) { ...@@ -329,6 +342,7 @@ TEST_P(EmbeddedTestServerTest, ConnectionListenerAccept) {
EXPECT_EQ(1u, connection_listener_.SocketAcceptedCount()); EXPECT_EQ(1u, connection_listener_.SocketAcceptedCount());
EXPECT_FALSE(connection_listener_.DidReadFromSocket()); EXPECT_FALSE(connection_listener_.DidReadFromSocket());
EXPECT_FALSE(connection_listener_.DidGetSocketOnComplete());
} }
TEST_P(EmbeddedTestServerTest, ConnectionListenerRead) { TEST_P(EmbeddedTestServerTest, ConnectionListenerRead) {
...@@ -343,6 +357,22 @@ TEST_P(EmbeddedTestServerTest, ConnectionListenerRead) { ...@@ -343,6 +357,22 @@ TEST_P(EmbeddedTestServerTest, ConnectionListenerRead) {
WaitForResponses(1); WaitForResponses(1);
EXPECT_EQ(1u, connection_listener_.SocketAcceptedCount()); EXPECT_EQ(1u, connection_listener_.SocketAcceptedCount());
EXPECT_TRUE(connection_listener_.DidReadFromSocket()); EXPECT_TRUE(connection_listener_.DidReadFromSocket());
EXPECT_TRUE(connection_listener_.DidGetSocketOnComplete());
}
TEST_P(EmbeddedTestServerTest, ConnectionListenerComplete) {
ASSERT_TRUE(server_->Start());
std::unique_ptr<URLFetcher> fetcher =
URLFetcher::Create(server_->GetURL("/non-existent"), URLFetcher::GET,
this, TRAFFIC_ANNOTATION_FOR_TESTS);
fetcher->SetRequestContext(request_context_getter_.get());
fetcher->Start();
WaitForResponses(1);
EXPECT_EQ(1u, connection_listener_.SocketAcceptedCount());
EXPECT_TRUE(connection_listener_.DidReadFromSocket());
EXPECT_TRUE(connection_listener_.DidGetSocketOnComplete());
} }
TEST_P(EmbeddedTestServerTest, ConcurrentFetches) { TEST_P(EmbeddedTestServerTest, ConcurrentFetches) {
......
...@@ -36,5 +36,8 @@ void SimpleConnectionListener::WaitForConnections() { ...@@ -36,5 +36,8 @@ void SimpleConnectionListener::WaitForConnections() {
run_loop_.Run(); run_loop_.Run();
} }
void SimpleConnectionListener::OnResponseCompletedSuccessfully(
std::unique_ptr<StreamSocket> socket) {}
} // namespace test_server } // namespace test_server
} // namespace net } // namespace net
...@@ -40,6 +40,8 @@ class SimpleConnectionListener : public EmbeddedTestServerConnectionListener { ...@@ -40,6 +40,8 @@ class SimpleConnectionListener : public EmbeddedTestServerConnectionListener {
std::unique_ptr<StreamSocket> AcceptedSocket( std::unique_ptr<StreamSocket> AcceptedSocket(
std::unique_ptr<StreamSocket> socket) override; std::unique_ptr<StreamSocket> socket) override;
void ReadFromSocket(const StreamSocket& socket, int rv) override; void ReadFromSocket(const StreamSocket& socket, int rv) override;
void OnResponseCompletedSuccessfully(
std::unique_ptr<StreamSocket> socket) override;
// Wait until the expected number of connections have been seen. // Wait until the expected number of connections have been seen.
void WaitForConnections(); void WaitForConnections();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment