Commit 31ca50f1 authored by rch's avatar rch Committed by Commit bot

Factor out the QUIC socket reading code into a stand alone QuicPacketReader

and share it with both the QuicClientSession and QuicSimpleClient.

BUG=

Review URL: https://codereview.chromium.org/1009803003

Cr-Commit-Position: refs/heads/master@{#321522}
parent 64461c4d
......@@ -926,6 +926,8 @@
'quic/quic_packet_creator.h',
'quic/quic_packet_generator.cc',
'quic/quic_packet_generator.h',
'quic/quic_packet_reader.cc',
'quic/quic_packet_reader.h',
'quic/quic_packet_writer.h',
'quic/quic_protocol.cc',
'quic/quic_protocol.h',
......
......@@ -166,16 +166,14 @@ QuicClientSession::QuicClientSession(
require_confirmation_(false),
stream_factory_(stream_factory),
socket_(socket.Pass()),
read_buffer_(new IOBufferWithSize(kMaxPacketSize)),
transport_security_state_(transport_security_state),
server_info_(server_info.Pass()),
read_pending_(false),
num_total_streams_(0),
task_runner_(task_runner),
net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_QUIC_SESSION)),
packet_reader_(socket_.get(), this, net_log_),
dns_resolution_end_time_(dns_resolution_end_time),
logger_(new QuicConnectionLogger(this, connection_description, net_log_)),
num_packets_read_(0),
going_away_(false),
weak_factory_(this) {
connection->set_debug_visitor(logger_.get());
......@@ -767,32 +765,7 @@ void QuicClientSession::OnProofVerifyDetailsAvailable(
}
void QuicClientSession::StartReading() {
if (read_pending_) {
return;
}
read_pending_ = true;
int rv = socket_->Read(read_buffer_.get(),
read_buffer_->size(),
base::Bind(&QuicClientSession::OnReadComplete,
weak_factory_.GetWeakPtr()));
UMA_HISTOGRAM_BOOLEAN("Net.QuicSession.AsyncRead", rv == ERR_IO_PENDING);
if (rv == ERR_IO_PENDING) {
num_packets_read_ = 0;
return;
}
if (++num_packets_read_ > 32) {
num_packets_read_ = 0;
// Data was read, process it.
// Schedule the work through the message loop to 1) prevent infinite
// recursion and 2) avoid blocking the thread for too long.
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&QuicClientSession::OnReadComplete,
weak_factory_.GetWeakPtr(), rv));
} else {
OnReadComplete(rv);
}
packet_reader_.StartReading();
}
void QuicClientSession::CloseSessionOnError(int error) {
......@@ -874,33 +847,23 @@ base::WeakPtr<QuicClientSession> QuicClientSession::GetWeakPtr() {
return weak_factory_.GetWeakPtr();
}
void QuicClientSession::OnReadComplete(int result) {
read_pending_ = false;
if (result == 0)
result = ERR_CONNECTION_CLOSED;
if (result < 0) {
DVLOG(1) << "Closing session on read error: " << result;
UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result);
NotifyFactoryOfSessionGoingAway();
CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR);
NotifyFactoryOfSessionClosedLater();
return;
}
void QuicClientSession::OnReadError(int result) {
DVLOG(1) << "Closing session on read error: " << result;
UMA_HISTOGRAM_SPARSE_SLOWLY("Net.QuicSession.ReadError", -result);
NotifyFactoryOfSessionGoingAway();
CloseSessionOnErrorInner(result, QUIC_PACKET_READ_ERROR);
NotifyFactoryOfSessionClosedLater();
}
QuicEncryptedPacket packet(read_buffer_->data(), result);
IPEndPoint local_address;
IPEndPoint peer_address;
socket_->GetLocalAddress(&local_address);
socket_->GetPeerAddress(&peer_address);
// ProcessUdpPacket might result in |this| being deleted, so we
// use a weak pointer to be safe.
bool QuicClientSession::OnPacket(const QuicEncryptedPacket& packet,
IPEndPoint local_address,
IPEndPoint peer_address) {
connection()->ProcessUdpPacket(local_address, peer_address, packet);
if (!connection()->connected()) {
NotifyFactoryOfSessionClosedLater();
return;
return false;
}
StartReading();
return true;
}
void QuicClientSession::NotifyFactoryOfSessionGoingAway() {
......
......@@ -21,6 +21,7 @@
#include "net/quic/quic_client_session_base.h"
#include "net/quic/quic_connection_logger.h"
#include "net/quic/quic_crypto_client_stream.h"
#include "net/quic/quic_packet_reader.h"
#include "net/quic/quic_protocol.h"
#include "net/quic/quic_reliable_client_stream.h"
......@@ -40,7 +41,8 @@ namespace test {
class QuicClientSessionPeer;
} // namespace test
class NET_EXPORT_PRIVATE QuicClientSession : public QuicClientSessionBase {
class NET_EXPORT_PRIVATE QuicClientSession : public QuicClientSessionBase,
public QuicPacketReader::Visitor {
public:
// An interface for observing events on a session.
class NET_EXPORT_PRIVATE Observer {
......@@ -148,6 +150,12 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicClientSessionBase {
void OnConnectionClosed(QuicErrorCode error, bool from_peer) override;
void OnSuccessfulVersionNegotiation(const QuicVersion& version) override;
// QuicPacketReader::Visitor methods:
void OnReadError(int result) override;
bool OnPacket(const QuicEncryptedPacket& packet,
IPEndPoint local_address,
IPEndPoint peer_address) override;
// Performs a crypto handshake with the server.
int CryptoConnect(bool require_confirmation,
const CompletionCallback& callback);
......@@ -228,23 +236,20 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicClientSessionBase {
scoped_ptr<QuicCryptoClientStream> crypto_stream_;
QuicStreamFactory* stream_factory_;
scoped_ptr<DatagramClientSocket> socket_;
scoped_refptr<IOBufferWithSize> read_buffer_;
TransportSecurityState* transport_security_state_;
scoped_ptr<QuicServerInfo> server_info_;
scoped_ptr<CertVerifyResult> cert_verify_result_;
std::string pinning_failure_log_;
ObserverSet observers_;
StreamRequestQueue stream_requests_;
bool read_pending_;
CompletionCallback callback_;
size_t num_total_streams_;
base::TaskRunner* task_runner_;
BoundNetLog net_log_;
QuicPacketReader packet_reader_;
base::TimeTicks dns_resolution_end_time_;
base::TimeTicks handshake_start_; // Time the handshake was started.
scoped_ptr<QuicConnectionLogger> logger_;
// Number of packets read in the current read loop.
size_t num_packets_read_;
// True when the session is going away, and streams may no longer be created
// on this session. Existing stream will continue to be processed.
bool going_away_;
......
// Copyright (c) 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/quic/quic_packet_reader.h"
#include "base/metrics/histogram.h"
#include "net/base/net_errors.h"
namespace net {
QuicPacketReader::QuicPacketReader(DatagramClientSocket* socket,
Visitor* visitor,
const BoundNetLog& net_log)
: socket_(socket),
visitor_(visitor),
read_pending_(false),
num_packets_read_(0),
read_buffer_(new IOBufferWithSize(kMaxPacketSize)),
net_log_(net_log),
weak_factory_(this) {
}
QuicPacketReader::~QuicPacketReader() {
}
void QuicPacketReader::StartReading() {
if (read_pending_)
return;
DCHECK(socket_);
read_pending_ = true;
int rv = socket_->Read(read_buffer_.get(), read_buffer_->size(),
base::Bind(&QuicPacketReader::OnReadComplete,
weak_factory_.GetWeakPtr()));
UMA_HISTOGRAM_BOOLEAN("Net.QuicSession.AsyncRead", rv == ERR_IO_PENDING);
if (rv == ERR_IO_PENDING) {
num_packets_read_ = 0;
return;
}
if (++num_packets_read_ > 32) {
num_packets_read_ = 0;
// Data was read, process it.
// Schedule the work through the message loop to 1) prevent infinite
// recursion and 2) avoid blocking the thread for too long.
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(&QuicPacketReader::OnReadComplete,
weak_factory_.GetWeakPtr(), rv));
} else {
OnReadComplete(rv);
}
}
void QuicPacketReader::OnReadComplete(int result) {
read_pending_ = false;
if (result == 0)
result = ERR_CONNECTION_CLOSED;
if (result < 0) {
visitor_->OnReadError(result);
return;
}
QuicEncryptedPacket packet(read_buffer_->data(), result);
IPEndPoint local_address;
IPEndPoint peer_address;
socket_->GetLocalAddress(&local_address);
socket_->GetPeerAddress(&peer_address);
if (!visitor_->OnPacket(packet, local_address, peer_address))
return;
StartReading();
}
} // namespace net
// Copyright (c) 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
#ifndef NET_QUIC_QUIC_PACKET_READER_H_
#define NET_QUIC_QUIC_PACKET_READER_H_
#include "base/memory/weak_ptr.h"
#include "net/base/io_buffer.h"
#include "net/base/net_export.h"
#include "net/base/net_log.h"
#include "net/quic/quic_protocol.h"
#include "net/udp/datagram_client_socket.h"
namespace net {
class NET_EXPORT_PRIVATE QuicPacketReader {
public:
class NET_EXPORT_PRIVATE Visitor {
public:
virtual ~Visitor() {};
virtual void OnReadError(int result) = 0;
virtual bool OnPacket(const QuicEncryptedPacket& packet,
IPEndPoint local_address,
IPEndPoint peer_address) = 0;
};
QuicPacketReader(DatagramClientSocket* socket,
Visitor* visitor,
const BoundNetLog& net_log);
~QuicPacketReader();
// Causes the QuicConnectionHelper to start reading from the socket
// and passing the data along to the QuicConnection.
void StartReading();
private:
// A completion callback invoked when a read completes.
void OnReadComplete(int result);
DatagramClientSocket* socket_;
Visitor* visitor_;
bool read_pending_;
int num_packets_read_;
scoped_refptr<IOBufferWithSize> read_buffer_;
BoundNetLog net_log_;
base::WeakPtrFactory<QuicPacketReader> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(QuicPacketReader);
};
} // namespace net
#endif // NET_QUIC_QUIC_PACKET_READER_H_
......@@ -21,13 +21,6 @@ using std::vector;
namespace net {
namespace tools {
namespace {
// Allocate some extra space so we can send an error if the server goes over
// the limit.
const int kReadBufferSize = 2 * kMaxPacketSize;
} // namespace
QuicSimpleClient::QuicSimpleClient(IPEndPoint server_address,
const QuicServerId& server_id,
......@@ -38,9 +31,6 @@ QuicSimpleClient::QuicSimpleClient(IPEndPoint server_address,
helper_(CreateQuicConnectionHelper()),
initialized_(false),
supported_versions_(supported_versions),
read_pending_(false),
synchronous_read_count_(0),
read_buffer_(new IOBufferWithSize(kReadBufferSize)),
weak_factory_(this) {
}
......@@ -55,9 +45,6 @@ QuicSimpleClient::QuicSimpleClient(IPEndPoint server_address,
helper_(CreateQuicConnectionHelper()),
initialized_(false),
supported_versions_(supported_versions),
read_pending_(false),
synchronous_read_count_(0),
read_buffer_(new IOBufferWithSize(kReadBufferSize)),
weak_factory_(this) {
}
......@@ -135,8 +122,8 @@ bool QuicSimpleClient::CreateUDPSocket() {
}
socket_.swap(socket);
read_pending_ = false;
packet_reader_.reset(new QuicPacketReader(socket_.get(), this,
BoundNetLog()));
if (socket != nullptr) {
socket->Close();
......@@ -147,7 +134,7 @@ bool QuicSimpleClient::CreateUDPSocket() {
bool QuicSimpleClient::Connect() {
StartConnect();
StartReading();
packet_reader_->StartReading();
while (EncryptionBeingEstablished()) {
WaitForEvents();
}
......@@ -185,8 +172,7 @@ void QuicSimpleClient::Disconnect() {
}
writer_.reset();
read_pending_ = false;
packet_reader_.reset();
initialized_ = false;
}
......@@ -309,58 +295,20 @@ QuicPacketWriter* QuicSimpleClient::CreateQuicPacketWriter() {
return new QuicDefaultPacketWriter(socket_.get());
}
void QuicSimpleClient::StartReading() {
if (read_pending_) {
return;
}
read_pending_ = true;
int result = socket_->Read(
read_buffer_.get(),
read_buffer_->size(),
base::Bind(&QuicSimpleClient::OnReadComplete,
weak_factory_.GetWeakPtr()));
if (result == ERR_IO_PENDING) {
synchronous_read_count_ = 0;
return;
}
if (++synchronous_read_count_ > 32) {
synchronous_read_count_ = 0;
// Schedule the processing through the message loop to 1) prevent infinite
// recursion and 2) avoid blocking the thread for too long.
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&QuicSimpleClient::OnReadComplete,
weak_factory_.GetWeakPtr(), result));
} else {
OnReadComplete(result);
}
void QuicSimpleClient::OnReadError(int result) {
LOG(ERROR) << "QuicSimpleClient read failed: " << ErrorToString(result);
Disconnect();
}
void QuicSimpleClient::OnReadComplete(int result) {
read_pending_ = false;
if (result == 0)
result = ERR_CONNECTION_CLOSED;
if (result < 0) {
LOG(ERROR) << "QuicSimpleClient read failed: " << ErrorToString(result);
Disconnect();
return;
}
QuicEncryptedPacket packet(read_buffer_->data(), result);
IPEndPoint local_address;
IPEndPoint peer_address;
socket_->GetLocalAddress(&local_address);
socket_->GetPeerAddress(&peer_address);
bool QuicSimpleClient::OnPacket(const QuicEncryptedPacket& packet,
IPEndPoint local_address,
IPEndPoint peer_address) {
session_->connection()->ProcessUdpPacket(local_address, peer_address, packet);
if (!session_->connection()->connected()) {
return;
return false;
}
StartReading();
return true;
}
} // namespace tools
......
......@@ -22,6 +22,7 @@
#include "net/quic/quic_config.h"
#include "net/quic/quic_framer.h"
#include "net/quic/quic_packet_creator.h"
#include "net/quic/quic_packet_reader.h"
#include "net/tools/quic/quic_simple_client_session.h"
#include "net/tools/quic/quic_simple_client_stream.h"
......@@ -39,7 +40,8 @@ namespace test {
class QuicClientPeer;
} // namespace test
class QuicSimpleClient : public QuicDataStream::Visitor {
class QuicSimpleClient : public QuicDataStream::Visitor,
public QuicPacketReader::Visitor {
public:
class ResponseListener {
public:
......@@ -113,12 +115,11 @@ class QuicSimpleClient : public QuicDataStream::Visitor {
// Returns true if there are any outstanding requests.
bool WaitForEvents();
// Start the read loop on the socket.
void StartReading();
// Called on reads that complete asynchronously. Dispatches the packet and
// calls StartReading() again.
void OnReadComplete(int result);
// QuicPacketReader::Visitor
void OnReadError(int result) override;
bool OnPacket(const QuicEncryptedPacket& packet,
IPEndPoint local_address,
IPEndPoint peer_address) override;
// QuicDataStream::Visitor
void OnClose(QuicDataStream* stream) override;
......@@ -278,18 +279,11 @@ class QuicSimpleClient : public QuicDataStream::Visitor {
// Body of most recent response.
std::string latest_response_body_;
bool read_pending_;
// The number of iterations of the read loop that have completed synchronously
// and without posting a new task to the message loop.
int synchronous_read_count_;
// The target buffer of the current read.
scoped_refptr<IOBufferWithSize> read_buffer_;
// The log used for the sockets.
NetLog net_log_;
scoped_ptr<QuicPacketReader> packet_reader_;
base::WeakPtrFactory<QuicSimpleClient> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(QuicSimpleClient);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment