Commit 29531a66 authored by Eric Willigers's avatar Eric Willigers Committed by Chromium LUCI CQ

Direct Sockets: Browser test for reading from TCP socket

We add a browser test that reads from the socket using script.

Bug: 905818
Change-Id: I5addd5bca78325bb549a86e8443773c893946561
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2629187Reviewed-by: default avatarGlen Robertson <glenrob@chromium.org>
Commit-Queue: Eric Willigers <ericwilligers@chromium.org>
Cr-Commit-Position: refs/heads/master@{#843811}
parent d9887c40
...@@ -190,25 +190,34 @@ net::Error UnconditionallyPermitConnection( ...@@ -190,25 +190,34 @@ net::Error UnconditionallyPermitConnection(
return net::OK; return net::OK;
} }
class ReadWaiter { class ReadWriteWaiter {
public: public:
explicit ReadWaiter(uint32_t required_bytes) ReadWriteWaiter(
: required_bytes_(required_bytes) {} uint32_t required_receive_bytes,
uint32_t required_send_bytes,
void Init(mojo::Remote<network::mojom::TCPServerSocket>& tcp_server_socket) { mojo::Remote<network::mojom::TCPServerSocket>& tcp_server_socket)
: required_receive_bytes_(required_receive_bytes),
required_send_bytes_(required_send_bytes) {
tcp_server_socket->Accept( tcp_server_socket->Accept(
/*observer=*/mojo::NullRemote(), /*observer=*/mojo::NullRemote(),
base::BindLambdaForTesting( base::BindRepeating(&ReadWriteWaiter::OnAccept,
[this](int result, base::Unretained(this)));
}
void Await() { run_loop_.Run(); }
private:
void OnAccept(
int result,
const base::Optional<net::IPEndPoint>& remote_addr, const base::Optional<net::IPEndPoint>& remote_addr,
mojo::PendingRemote<network::mojom::TCPConnectedSocket> mojo::PendingRemote<network::mojom::TCPConnectedSocket> accepted_socket,
accepted_socket,
mojo::ScopedDataPipeConsumerHandle consumer_handle, mojo::ScopedDataPipeConsumerHandle consumer_handle,
mojo::ScopedDataPipeProducerHandle producer_handle) { mojo::ScopedDataPipeProducerHandle producer_handle) {
DCHECK(!accepted_socket_);
DCHECK_EQ(result, net::OK); DCHECK_EQ(result, net::OK);
DCHECK(!accepted_socket_);
accepted_socket_.Bind(std::move(accepted_socket)); accepted_socket_.Bind(std::move(accepted_socket));
if (required_receive_bytes_ > 0) {
receive_stream_ = std::move(consumer_handle); receive_stream_ = std::move(consumer_handle);
read_watcher_ = std::make_unique<mojo::SimpleWatcher>( read_watcher_ = std::make_unique<mojo::SimpleWatcher>(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL); FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
...@@ -216,27 +225,40 @@ class ReadWaiter { ...@@ -216,27 +225,40 @@ class ReadWaiter {
receive_stream_.get(), receive_stream_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED, MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&ReadWaiter::OnReadReady, base::BindRepeating(&ReadWriteWaiter::OnReadReady,
base::Unretained(this))); base::Unretained(this)));
read_watcher_->ArmOrNotify(); read_watcher_->ArmOrNotify();
}));
} }
void Await() { run_loop_.Run(); } if (required_send_bytes_ > 0) {
send_stream_ = std::move(producer_handle);
write_watcher_ = std::make_unique<mojo::SimpleWatcher>(
FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL);
write_watcher_->Watch(
send_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&ReadWriteWaiter::OnWriteReady,
base::Unretained(this)));
write_watcher_->ArmOrNotify();
}
}
private:
void OnReadReady(MojoResult result, const mojo::HandleSignalsState& state) { void OnReadReady(MojoResult result, const mojo::HandleSignalsState& state) {
ReadData(); ReadData();
} }
void OnWriteReady(MojoResult result, const mojo::HandleSignalsState& state) {
WriteData();
}
void ReadData() { void ReadData() {
while (true) { while (true) {
DCHECK(receive_stream_.is_valid()); DCHECK(receive_stream_.is_valid());
DCHECK_LT(bytes_read_, required_bytes_); DCHECK_LT(bytes_received_, required_receive_bytes_);
const void* buffer = nullptr; const void* buffer = nullptr;
uint32_t num_bytes = 0; uint32_t num_bytes = 0;
const MojoResult mojo_result = receive_stream_->BeginReadData( MojoResult mojo_result = receive_stream_->BeginReadData(
&buffer, &num_bytes, MOJO_READ_DATA_FLAG_NONE); &buffer, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
if (mojo_result == MOJO_RESULT_SHOULD_WAIT) { if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
read_watcher_->ArmOrNotify(); read_watcher_->ArmOrNotify();
...@@ -250,25 +272,70 @@ class ReadWaiter { ...@@ -250,25 +272,70 @@ class ReadWaiter {
const unsigned char* current = static_cast<const unsigned char*>(buffer); const unsigned char* current = static_cast<const unsigned char*>(buffer);
const unsigned char* const end = current + num_bytes; const unsigned char* const end = current + num_bytes;
while (current < end) { while (current < end) {
EXPECT_EQ(*current, bytes_read_ % 256); EXPECT_EQ(*current, bytes_received_ % 256);
++current; ++current;
++bytes_read_; ++bytes_received_;
} }
receive_stream_->EndReadData(num_bytes); mojo_result = receive_stream_->EndReadData(num_bytes);
if (bytes_read_ == required_bytes_) { DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
if (bytes_received_ == required_receive_bytes_) {
if (bytes_sent_ == required_send_bytes_)
run_loop_.Quit(); run_loop_.Quit();
return; return;
} }
} }
} }
void WriteData() {
while (true) {
DCHECK(send_stream_.is_valid());
DCHECK_LT(bytes_sent_, required_send_bytes_);
void* buffer = nullptr;
uint32_t num_bytes = 0;
MojoResult mojo_result = send_stream_->BeginWriteData(
&buffer, &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
write_watcher_->ArmOrNotify();
return;
}
DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
// This is guaranteed by Mojo.
DCHECK_GT(num_bytes, 0u);
num_bytes = std::min(num_bytes, required_send_bytes_ - bytes_sent_);
unsigned char* current = static_cast<unsigned char*>(buffer);
unsigned char* const end = current + num_bytes;
while (current != end) {
*current = bytes_sent_ % 256;
++current;
++bytes_sent_;
}
mojo_result = send_stream_->EndWriteData(num_bytes);
DCHECK_EQ(mojo_result, MOJO_RESULT_OK);
if (bytes_sent_ == required_send_bytes_) {
if (bytes_received_ == required_receive_bytes_)
run_loop_.Quit();
return;
}
}
}
const uint32_t required_receive_bytes_;
const uint32_t required_send_bytes_;
base::RunLoop run_loop_; base::RunLoop run_loop_;
const uint32_t required_bytes_;
mojo::Remote<network::mojom::TCPConnectedSocket> accepted_socket_; mojo::Remote<network::mojom::TCPConnectedSocket> accepted_socket_;
mojo::ScopedDataPipeConsumerHandle receive_stream_; mojo::ScopedDataPipeConsumerHandle receive_stream_;
mojo::ScopedDataPipeProducerHandle send_stream_;
std::unique_ptr<mojo::SimpleWatcher> read_watcher_; std::unique_ptr<mojo::SimpleWatcher> read_watcher_;
uint32_t bytes_read_ = 0; std::unique_ptr<mojo::SimpleWatcher> write_watcher_;
uint32_t bytes_received_ = 0;
uint32_t bytes_sent_ = 0;
}; };
} // anonymous namespace } // anonymous namespace
...@@ -499,15 +566,47 @@ IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, WriteTcp) { ...@@ -499,15 +566,47 @@ IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, WriteTcp) {
const uint32_t kRequiredBytes = 10000; const uint32_t kRequiredBytes = 10000;
EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL())); EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL()));
ReadWaiter read_waiter(kRequiredBytes);
const uint16_t listening_port = StartTcpServer(); const uint16_t listening_port = StartTcpServer();
read_waiter.Init(tcp_server_socket()); ReadWriteWaiter waiter(/*required_receive_bytes=*/kRequiredBytes,
/*required_send_bytes=*/0, tcp_server_socket());
const std::string script = base::StringPrintf( const std::string script = base::StringPrintf(
"writeTcp({remoteAddress: '127.0.0.1', remotePort: %d}, %u)", "writeTcp({remoteAddress: '127.0.0.1', remotePort: %d}, %u)",
listening_port, kRequiredBytes); listening_port, kRequiredBytes);
EXPECT_EQ("write succeeded", EvalJs(shell(), script)); EXPECT_EQ("write succeeded", EvalJs(shell(), script));
read_waiter.Await(); waiter.Await();
}
IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, ReadTcp) {
const uint32_t kRequiredBytes = 150000;
EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL()));
const uint16_t listening_port = StartTcpServer();
ReadWriteWaiter waiter(/*required_receive_bytes=*/0,
/*required_send_bytes=*/kRequiredBytes,
tcp_server_socket());
const std::string script = base::StringPrintf(
"readTcp({remoteAddress: '127.0.0.1', remotePort: %d}, %u)",
listening_port, kRequiredBytes);
EXPECT_EQ("read succeeded", EvalJs(shell(), script));
waiter.Await();
}
IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, ReadWriteTcp) {
const uint32_t kRequiredBytes = 1000;
EXPECT_TRUE(NavigateToURL(shell(), GetTestPageURL()));
const uint16_t listening_port = StartTcpServer();
ReadWriteWaiter waiter(/*required_receive_bytes=*/kRequiredBytes,
/*required_send_bytes=*/kRequiredBytes,
tcp_server_socket());
const std::string script = base::StringPrintf(
"readWriteTcp({remoteAddress: '127.0.0.1', remotePort: %d}, %u)",
listening_port, kRequiredBytes);
EXPECT_EQ("readWrite succeeded", EvalJs(shell(), script));
waiter.Await();
} }
IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, CloseTcp) { IN_PROC_BROWSER_TEST_F(DirectSocketsBrowserTest, CloseTcp) {
......
...@@ -45,6 +45,54 @@ ...@@ -45,6 +45,54 @@
} }
} }
async function readLoop(reader, requiredBytes) {
if (!(reader instanceof ReadableStreamDefaultReader))
return 'read failed: reader is not a ReadableStreamDefaultReader';
let bytesRead = 0;
while (bytesRead < requiredBytes) {
const { value, done } = await reader.read();
if (done)
return 'read failed: unexpected stream close';
if (!value || value.length === 0)
return 'read failed: no data returned';
for (let index = 0; index < value.length; ++index) {
if (value[index] !== bytesRead % 256)
return 'read failed: bad data returned';
++bytesRead;
}
}
return 'read succeeded';
}
async function readTcp(options, requiredBytes) {
try {
let tcpSocket = await navigator.openTCPSocket(options);
let reader = tcpSocket.readable.getReader();
return await readLoop(reader, requiredBytes);
} catch(error) {
return ('readTcp failed: ' + error);
}
}
async function readWriteTcp(options, requiredBytes) {
try {
let tcpSocket = await navigator.openTCPSocket(options);
let reader = tcpSocket.readable.getReader();
let writer = tcpSocket.writable.getWriter();
let [readResult, writeResult] =
await Promise.all([readLoop(reader, requiredBytes),
writeLoop(writer, requiredBytes)]);
if (readResult !== 'read succeeded')
return readResult;
if (writeResult !== 'write succeeded')
return writeResult;
return 'readWrite succeeded';
} catch(error) {
return ('readWriteTcp failed: ' + error);
}
}
async function closeTcp(options, closeWriter) { async function closeTcp(options, closeWriter) {
try { try {
let tcpSocket = await navigator.openTCPSocket(options); let tcpSocket = await navigator.openTCPSocket(options);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment