Commit f702d57b authored by rch@chromium.org's avatar rch@chromium.org

Add a QuicHttpStream class.


Review URL: https://chromiumcodereview.appspot.com/11364068

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@170968 0039d316-1c4b-4281-b951-d872f2087c98
parent f9cf5577
......@@ -35,6 +35,7 @@ class NET_EXPORT_PRIVATE HttpStreamBase {
virtual ~HttpStreamBase() {}
// Initialize stream. Must be called before calling SendRequest().
// |request_info| must outlive the HttpStreamBase.
// Returns a net error code, possibly ERR_IO_PENDING.
virtual int InitializeStream(const HttpRequestInfo* request_info,
const BoundNetLog& net_log,
......@@ -44,6 +45,7 @@ class NET_EXPORT_PRIVATE HttpStreamBase {
// ERR_IO_PENDING is returned if the operation could not be completed
// synchronously, in which case the result will be passed to the callback
// when available. Returns OK on success.
// |response| must outlive the HttpStreamBase.
virtual int SendRequest(const HttpRequestHeaders& request_headers,
HttpResponseInfo* response,
const CompletionCallback& callback) = 0;
......@@ -89,6 +91,9 @@ class NET_EXPORT_PRIVATE HttpStreamBase {
// the response headers indicate either chunked encoding or content length.
// If neither is sent, the server must close the connection for us to detect
// the end of the response.
// TODO(rch): Rename this method, so that it is clear why it exists
// particularly as it applies to QUIC and SPDY for which the end of the
// response is always findable.
virtual bool CanFindEndOfResponse() const = 0;
// A stream exists on top of a connection. If the connection has been used
......
......@@ -684,6 +684,8 @@
'quic/quic_fec_group.h',
'quic/quic_framer.cc',
'quic/quic_framer.h',
'quic/quic_http_stream.cc',
'quic/quic_http_stream.h',
'quic/quic_packet_creator.cc',
'quic/quic_packet_creator.h',
'quic/quic_protocol.cc',
......@@ -1447,6 +1449,7 @@
'quic/quic_crypto_stream_test.cc',
'quic/quic_fec_group_test.cc',
'quic/quic_framer_test.cc',
'quic/quic_http_stream_test.cc',
'quic/quic_packet_creator_test.cc',
'quic/quic_reliable_client_stream_test.cc',
'quic/quic_session_test.cc',
......
......@@ -14,6 +14,7 @@ QuicClientSession::QuicClientSession(QuicConnection* connection)
}
QuicClientSession::~QuicClientSession() {
STLDeleteValues(&streams_);
}
QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() {
......@@ -28,6 +29,7 @@ QuicReliableClientStream* QuicClientSession::CreateOutgoingReliableStream() {
}
QuicReliableClientStream* stream =
new QuicReliableClientStream(GetNextStreamId(), this);
streams_[stream->id()] = stream;
ActivateStream(stream);
return stream;
......@@ -56,6 +58,18 @@ ReliableQuicStream* QuicClientSession::CreateIncomingReliableStream(
return NULL;
}
void QuicClientSession::CloseStream(QuicStreamId stream_id) {
QuicSession::CloseStream(stream_id);
StreamMap::iterator it = streams_.find(stream_id);
DCHECK(it != streams_.end());
if (it != streams_.end()) {
ReliableQuicStream* stream = it->second;
streams_.erase(it);
delete stream;
}
}
void QuicClientSession::OnCryptoHandshakeComplete(QuicErrorCode error) {
if (!callback_.is_null()) {
callback_.Run(error == QUIC_NO_ERROR ? OK : ERR_UNEXPECTED);
......
......@@ -7,6 +7,7 @@
#ifndef NET_QUIC_QUIC_CLIENT_SESSION_H_
#define NET_QUIC_QUIC_CLIENT_SESSION_H_
#include "base/hash_tables.h"
#include "net/base/completion_callback.h"
#include "net/quic/quic_crypto_client_stream.h"
#include "net/quic/quic_reliable_client_stream.h"
......@@ -23,6 +24,7 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession {
// QuicSession methods:
virtual QuicReliableClientStream* CreateOutgoingReliableStream() OVERRIDE;
virtual QuicCryptoClientStream* GetCryptoStream() OVERRIDE;
virtual void CloseStream(QuicStreamId stream_id) OVERRIDE;
virtual void OnCryptoHandshakeComplete(QuicErrorCode error) OVERRIDE;
// Perform a crypto handshake with the server.
......@@ -34,7 +36,9 @@ class NET_EXPORT_PRIVATE QuicClientSession : public QuicSession {
QuicStreamId id) OVERRIDE;
private:
typedef base::hash_map<QuicStreamId, ReliableQuicStream*> StreamMap;
QuicCryptoClientStream crypto_stream_;
StreamMap streams_;
CompletionCallback callback_;
......
......@@ -60,17 +60,14 @@ TEST_F(QuicClientSessionTest, MaxNumConnections) {
std::vector<QuicReliableClientStream*> streams;
for (size_t i = 0; i < kDefaultMaxStreamsPerConnection; i++) {
QuicReliableClientStream* stream = session_.CreateOutgoingReliableStream();
streams.push_back(stream);
EXPECT_TRUE(stream);
streams.push_back(stream);
}
EXPECT_FALSE(session_.CreateOutgoingReliableStream());
// Close a stream and ensure I can now open a new one.
session_.CloseStream(streams[0]->id());
scoped_ptr<QuicReliableClientStream> stream(
session_.CreateOutgoingReliableStream());
EXPECT_TRUE(stream.get());
STLDeleteElements(&streams);
EXPECT_TRUE(session_.CreateOutgoingReliableStream());
}
} // namespace
......
......@@ -66,7 +66,6 @@ void QuicConnectionHelper::SetResendAlarm(
}
void QuicConnectionHelper::SetSendAlarm(QuicTime::Delta delay) {
DCHECK(!send_alarm_registered_);
send_alarm_registered_ = true;
task_runner_->PostDelayedTask(
FROM_HERE,
......
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/quic/quic_http_stream.h"
#include "base/stringprintf.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_util.h"
#include "net/quic/quic_client_session.h"
#include "net/quic/quic_reliable_client_stream.h"
#include "net/quic/quic_utils.h"
#include "net/socket/next_proto.h"
#include "net/spdy/spdy_framer.h"
namespace net {
static const size_t kHeaderBufInitialSize = 4096;
QuicHttpStream::QuicHttpStream(QuicReliableClientStream* stream)
: io_state_(STATE_NONE),
stream_(stream),
request_info_(NULL),
request_body_stream_(NULL),
response_info_(NULL),
response_headers_received_(false),
read_buf_(new GrowableIOBuffer()),
user_buffer_len_(0),
ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) {
DCHECK(stream_);
stream_->SetDelegate(this);
}
QuicHttpStream::~QuicHttpStream() {
Close(false);
}
int QuicHttpStream::InitializeStream(const HttpRequestInfo* request_info,
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
CHECK(stream_);
request_info_ = request_info;
return OK;
}
int QuicHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
HttpResponseInfo* response,
const CompletionCallback& callback) {
CHECK(stream_);
CHECK(!request_body_stream_);
CHECK(!response_info_);
CHECK(!callback.is_null());
CHECK(response);
// Store the serialized request headers.
// TODO(rch): use SPDY serialization
std::string path = HttpUtil::PathForRequest(request_info_->url);
std::string first_line = base::StringPrintf("%s %s HTTP/1.1\r\n",
request_info_->method.c_str(),
path.c_str());
request_ = first_line + request_headers.ToString();
// Store the request body.
request_body_stream_ = request_info_->upload_data_stream;
if (request_body_stream_ && (request_body_stream_->size() ||
request_body_stream_->is_chunked())) {
// Use kMaxPacketSize as the buffer size, since the request
// body data is written with this size at a time.
// TODO(rch): use a smarter value since we can't write an entire
// packet due to overhead.
raw_request_body_buf_ = new IOBufferWithSize(kMaxPacketSize);
// The request body buffer is empty at first.
request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, 0);
}
// Store the response info.
response_info_ = response;
io_state_ = STATE_SEND_HEADERS;
int rv = DoLoop(OK);
if (rv == ERR_IO_PENDING)
callback_ = callback;
return rv > 0 ? OK : rv;
}
UploadProgress QuicHttpStream::GetUploadProgress() const {
if (!request_body_stream_)
return UploadProgress();
return UploadProgress(request_body_stream_->position(),
request_body_stream_->size());
}
int QuicHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
CHECK(!callback.is_null());
// Check if we already have the response headers. If so, return synchronously.
if (response_headers_received_) {
return OK;
}
// Still waiting for the response, return IO_PENDING.
CHECK(callback_.is_null());
callback_ = callback;
return ERR_IO_PENDING;
}
const HttpResponseInfo* QuicHttpStream::GetResponseInfo() const {
return response_info_;
}
int QuicHttpStream::ReadResponseBody(
IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
CHECK(buf);
CHECK(buf_len);
CHECK(!callback.is_null());
// If we have data buffered, complete the IO immediately.
if (!response_body_.empty()) {
int bytes_read = 0;
while (!response_body_.empty() && buf_len > 0) {
scoped_refptr<IOBufferWithSize> data = response_body_.front();
const int bytes_to_copy = std::min(buf_len, data->size());
memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
buf_len -= bytes_to_copy;
if (bytes_to_copy == data->size()) {
response_body_.pop_front();
} else {
const int bytes_remaining = data->size() - bytes_to_copy;
IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
bytes_remaining);
response_body_.pop_front();
response_body_.push_front(make_scoped_refptr(new_buffer));
}
bytes_read += bytes_to_copy;
}
return bytes_read;
}
if (!stream_) {
// If the stream is already closed, there is no body to read.
return 0;
}
CHECK(callback_.is_null());
CHECK(!user_buffer_);
CHECK_EQ(0, user_buffer_len_);
callback_ = callback;
user_buffer_ = buf;
user_buffer_len_ = buf_len;
return ERR_IO_PENDING;
}
void QuicHttpStream::Close(bool not_reusable) {
// Note: the not_reusable flag has no meaning for SPDY streams.
if (stream_) {
stream_->Close(QUIC_NO_ERROR);
}
}
HttpStream* QuicHttpStream::RenewStreamForAuth() {
return NULL;
}
bool QuicHttpStream::IsResponseBodyComplete() const {
return io_state_ == STATE_OPEN && !stream_;
}
bool QuicHttpStream::CanFindEndOfResponse() const {
return true;
}
bool QuicHttpStream::IsMoreDataBuffered() const {
return false;
}
bool QuicHttpStream::IsConnectionReused() const {
// TODO(rch): do something smarter here.
return stream_ && stream_->id() > 1;
}
void QuicHttpStream::SetConnectionReused() {
// QUIC doesn't need an indicator here.
}
bool QuicHttpStream::IsConnectionReusable() const {
// QUIC streams aren't considered reusable.
return false;
}
void QuicHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
DCHECK(stream_);
NOTIMPLEMENTED();
}
void QuicHttpStream::GetSSLCertRequestInfo(
SSLCertRequestInfo* cert_request_info) {
DCHECK(stream_);
NOTIMPLEMENTED();
}
bool QuicHttpStream::IsSpdyHttpStream() const {
return false;
}
void QuicHttpStream::Drain(HttpNetworkSession* session) {
if (stream_)
stream_->Close(QUIC_NO_ERROR);
delete this;
}
int QuicHttpStream::OnSendData() {
// TODO(rch): Change QUIC IO to provide notifications to the streams.
NOTREACHED();
return OK;
}
int QuicHttpStream::OnSendDataComplete(int status, bool* eof) {
// TODO(rch): Change QUIC IO to provide notifications to the streams.
NOTREACHED();
return OK;
}
int QuicHttpStream::OnDataReceived(const char* data, int length) {
// Are we still reading the response headers.
if (!response_headers_received_) {
// Grow the read buffer if necessary.
if (read_buf_->RemainingCapacity() < length) {
read_buf_->SetCapacity(read_buf_->capacity() + kHeaderBufInitialSize);
}
memcpy(read_buf_->data(), data, length);
read_buf_->set_offset(read_buf_->offset() + length);
int rv = ParseResponseHeaders();
if (rv != ERR_IO_PENDING && !callback_.is_null()) {
DoCallback(rv);
}
return OK;
}
if (callback_.is_null()) {
BufferResponseBody(data, length);
return OK;
}
if (length <= user_buffer_len_) {
memcpy(user_buffer_->data(), data, length);
} else {
memcpy(user_buffer_->data(), data, user_buffer_len_);
int delta = length - user_buffer_len_;
BufferResponseBody(data + user_buffer_len_, delta);
}
user_buffer_ = NULL;
user_buffer_len_ = 0;
DoCallback(length);
return OK;
}
void QuicHttpStream::OnClose(QuicErrorCode error) {
// TOOD(rch): find better errors.
int status = error == QUIC_NO_ERROR && response_headers_received_ ?
OK : ERR_ABORTED;
stream_ = NULL;
if (!callback_.is_null())
DoCallback(status);
}
void QuicHttpStream::OnIOComplete(int rv) {
rv = DoLoop(rv);
if (rv != ERR_IO_PENDING && !callback_.is_null()) {
DoCallback(rv);
}
}
void QuicHttpStream::DoCallback(int rv) {
CHECK_NE(rv, ERR_IO_PENDING);
CHECK(!callback_.is_null());
// The client callback can do anything, including destroying this class,
// so any pending callback must be issued after everything else is done.
CompletionCallback c = callback_;
callback_.Reset();
c.Run(rv);
}
int QuicHttpStream::DoLoop(int rv) {
do {
switch (io_state_) {
case STATE_SEND_HEADERS:
CHECK_EQ(OK, rv);
rv = DoSendHeaders();
break;
case STATE_SEND_HEADERS_COMPLETE:
rv = DoSendHeadersComplete(rv);
break;
case STATE_READ_REQUEST_BODY:
CHECK_EQ(OK, rv);
rv = DoReadRequestBody();
break;
case STATE_READ_REQUEST_BODY_COMPLETE:
rv = DoReadRequestBodyComplete(rv);
break;
case STATE_SEND_BODY:
CHECK_EQ(OK, rv);
rv = DoSendBody();
break;
case STATE_SEND_BODY_COMPLETE:
rv = DoSendBodyComplete(rv);
break;
case STATE_OPEN:
CHECK_EQ(OK, rv);
break;
default:
NOTREACHED() << "io_state_: " << io_state_;
break;
}
} while (io_state_ != STATE_NONE && io_state_ != STATE_OPEN &&
rv != ERR_IO_PENDING);
return rv;
}
int QuicHttpStream::DoSendHeaders() {
if (!stream_)
return ERR_UNEXPECTED;
bool has_upload_data = request_body_stream_ != NULL;
io_state_ = STATE_SEND_HEADERS_COMPLETE;
int rv = stream_->WriteData(request_, !has_upload_data);
return rv;
}
int QuicHttpStream::DoSendHeadersComplete(int rv) {
if (rv < 0) {
io_state_ = STATE_NONE;
return rv;
}
io_state_ = request_body_stream_ ?
STATE_READ_REQUEST_BODY : STATE_OPEN;
return OK;
}
int QuicHttpStream::DoReadRequestBody() {
io_state_ = STATE_READ_REQUEST_BODY_COMPLETE;
return request_body_stream_->Read(raw_request_body_buf_,
raw_request_body_buf_->size(),
base::Bind(&QuicHttpStream::OnIOComplete,
weak_factory_.GetWeakPtr()));
}
int QuicHttpStream::DoReadRequestBodyComplete(int rv) {
// |rv| is the result of read from the request body from the last call to
// DoSendBody().
if (rv < 0) {
io_state_ = STATE_NONE;
return rv;
}
request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, rv);
if (rv == 0) { // Reached the end.
DCHECK(request_body_stream_->IsEOF());
}
io_state_ = STATE_SEND_BODY;
return OK;
}
int QuicHttpStream::DoSendBody() {
if (!stream_)
return ERR_UNEXPECTED;
CHECK(request_body_stream_);
CHECK(request_body_buf_);
const bool eof = request_body_stream_->IsEOF();
int len = request_body_buf_->BytesRemaining();
if (len > 0 || eof) {
base::StringPiece data(request_body_buf_->data(), len);
int rv = stream_->WriteData(data, eof);
request_body_buf_->DidConsume(rv);
DCHECK_NE(ERR_IO_PENDING, rv);
if (eof) {
io_state_ = STATE_OPEN;
return OK;
}
return rv;
}
io_state_ = STATE_SEND_BODY_COMPLETE;
return OK;
}
int QuicHttpStream::DoSendBodyComplete(int rv) {
if (rv < 0) {
io_state_ = STATE_NONE;
return rv;
}
io_state_ = STATE_READ_REQUEST_BODY;
return OK;
}
int QuicHttpStream::ParseResponseHeaders() {
int end_offset = HttpUtil::LocateEndOfHeaders(read_buf_->StartOfBuffer(),
read_buf_->offset(), 0);
if (end_offset == -1) {
return ERR_IO_PENDING;
}
if (!stream_)
return ERR_UNEXPECTED;
scoped_refptr<HttpResponseHeaders> headers = new HttpResponseHeaders(
HttpUtil::AssembleRawHeaders(read_buf_->StartOfBuffer(), end_offset));
// Put the peer's IP address and port into the response.
IPEndPoint address = stream_->GetPeerAddress();
response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
response_info_->headers = headers;
response_info_->vary_data.Init(*request_info_, *response_info_->headers);
response_headers_received_ = true;
// Save the remaining received data.
int delta = read_buf_->offset() - end_offset;
if (delta > 0) {
BufferResponseBody(read_buf_->data(), delta);
}
return OK;
}
void QuicHttpStream::BufferResponseBody(const char* data, int length) {
IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
memcpy(io_buffer->data(), data, length);
response_body_.push_back(make_scoped_refptr(io_buffer));
}
} // namespace net
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef NET_QUIC_QUIC_HTTP_STREAM_H_
#define NET_QUIC_QUIC_HTTP_STREAM_H_
#include <list>
#include "base/memory/weak_ptr.h"
#include "net/base/io_buffer.h"
#include "net/http/http_stream.h"
#include "net/quic/quic_reliable_client_stream.h"
namespace net {
// The QuicHttpStream is a QUIC-specific HttpStream subclass. It holds a
// non-owning pointer to a QuicReliableClientStream which it uses to
// send and receive data.
class NET_EXPORT_PRIVATE QuicHttpStream :
public QuicReliableClientStream::Delegate,
public HttpStream {
public:
explicit QuicHttpStream(QuicReliableClientStream* stream);
virtual ~QuicHttpStream();
// HttpStream implementation.
virtual int InitializeStream(const HttpRequestInfo* request_info,
const BoundNetLog& net_log,
const CompletionCallback& callback) OVERRIDE;
virtual int SendRequest(const HttpRequestHeaders& request_headers,
HttpResponseInfo* response,
const CompletionCallback& callback) OVERRIDE;
virtual UploadProgress GetUploadProgress() const OVERRIDE;
virtual int ReadResponseHeaders(const CompletionCallback& callback) OVERRIDE;
virtual const HttpResponseInfo* GetResponseInfo() const OVERRIDE;
virtual int ReadResponseBody(IOBuffer* buf,
int buf_len,
const CompletionCallback& callback) OVERRIDE;
virtual void Close(bool not_reusable) OVERRIDE;
virtual HttpStream* RenewStreamForAuth() OVERRIDE;
virtual bool IsResponseBodyComplete() const OVERRIDE;
virtual bool CanFindEndOfResponse() const OVERRIDE;
virtual bool IsMoreDataBuffered() const OVERRIDE;
virtual bool IsConnectionReused() const OVERRIDE;
virtual void SetConnectionReused() OVERRIDE;
virtual bool IsConnectionReusable() const OVERRIDE;
virtual void GetSSLInfo(SSLInfo* ssl_info) OVERRIDE;
virtual void GetSSLCertRequestInfo(
SSLCertRequestInfo* cert_request_info) OVERRIDE;
virtual bool IsSpdyHttpStream() const OVERRIDE;
virtual void LogNumRttVsBytesMetrics() const OVERRIDE {}
virtual void Drain(HttpNetworkSession* session) OVERRIDE;
// QuicReliableClientStream::Delegate implementation
virtual int OnSendData() OVERRIDE;
virtual int OnSendDataComplete(int status, bool* eof) OVERRIDE;
virtual int OnDataReceived(const char* data, int length) OVERRIDE;
virtual void OnClose(QuicErrorCode error) OVERRIDE;
private:
enum State {
STATE_NONE,
STATE_SEND_HEADERS,
STATE_SEND_HEADERS_COMPLETE,
STATE_READ_REQUEST_BODY,
STATE_READ_REQUEST_BODY_COMPLETE,
STATE_SEND_BODY,
STATE_SEND_BODY_COMPLETE,
STATE_OPEN,
};
void OnIOComplete(int rv);
void DoCallback(int rv);
int DoLoop(int);
int DoSendHeaders();
int DoSendHeadersComplete(int rv);
int DoReadRequestBody();
int DoReadRequestBodyComplete(int rv);
int DoSendBody();
int DoSendBodyComplete(int rv);
int DoReadResponseHeaders();
int DoReadResponseHeadersComplete(int rv);
int ParseResponseHeaders();
void BufferResponseBody(const char* data, int length);
State io_state_;
QuicReliableClientStream* stream_; // Non-owning.
// The following three fields are all owned by the caller and must
// outlive this object, according to the HttpStream contract.
// The request to send.
const HttpRequestInfo* request_info_;
// The request body to send, if any, owned by the caller.
UploadDataStream* request_body_stream_;
// |response_info_| is the HTTP response data object which is filled in
// when a the response headers are read. It is not owned by this stream.
HttpResponseInfo* response_info_;
bool response_headers_received_;
// Serialized HTTP request.
std::string request_;
// Buffer into which response header data is read.
scoped_refptr<GrowableIOBuffer> read_buf_;
// We buffer the response body as it arrives asynchronously from the stream.
// TODO(rch): This is infinite buffering, which is bad.
std::list<scoped_refptr<IOBufferWithSize> > response_body_;
// The caller's callback to be used for asynchronous operations.
CompletionCallback callback_;
// Caller provided buffer for the ReadResponseBody() response.
scoped_refptr<IOBuffer> user_buffer_;
int user_buffer_len_;
// Temporary buffer used to read the request body from UploadDataStream.
scoped_refptr<IOBufferWithSize> raw_request_body_buf_;
// Wraps raw_request_body_buf_ to read the remaining data progressively.
scoped_refptr<DrainableIOBuffer> request_body_buf_;
base::WeakPtrFactory<QuicHttpStream> weak_factory_;
};
} // namespace net
#endif // NET_QUIC_QUIC_HTTP_STREAM_H_
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/quic/quic_http_stream.h"
#include <vector>
#include "net/base/net_errors.h"
#include "net/base/test_completion_callback.h"
#include "net/base/upload_data.h"
#include "net/base/upload_data_stream.h"
#include "net/http/http_response_headers.h"
#include "net/quic/quic_client_session.h"
#include "net/quic/quic_connection.h"
#include "net/quic/quic_connection_helper.h"
#include "net/quic/test_tools/mock_clock.h"
#include "net/quic/test_tools/quic_test_utils.h"
#include "net/quic/test_tools/test_task_runner.h"
#include "net/socket/socket_test_util.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
using testing::_;
namespace net {
class QuicConnectionPeer {
public:
static void SetScheduler(QuicConnection* connection,
QuicSendScheduler* scheduler) {
connection->scheduler_.reset(scheduler);
}
};
namespace test {
namespace {
const char kUploadData[] = "hello world!";
class TestQuicConnection : public QuicConnection {
public:
TestQuicConnection(QuicGuid guid,
IPEndPoint address,
QuicConnectionHelper* helper)
: QuicConnection(guid, address, helper) {
}
void SetScheduler(QuicSendScheduler* scheduler) {
QuicConnectionPeer::SetScheduler(this, scheduler);
}
};
} // namespace
class QuicHttpStreamTest : public ::testing::Test {
protected:
const static bool kFin = true;
const static bool kNoFin = false;
// Holds a packet to be written to the wire, and the IO mode that should
// be used by the mock socket when performing the write.
struct PacketToWrite {
PacketToWrite(IoMode mode, QuicEncryptedPacket* packet)
: mode(mode),
packet(packet) {
}
IoMode mode;
QuicEncryptedPacket* packet;
};
QuicHttpStreamTest()
: net_log_(BoundNetLog()),
read_buffer_(new IOBufferWithSize(4096)),
guid_(2),
framer_(QuicDecrypter::Create(kNULL), QuicEncrypter::Create(kNULL)),
creator_(guid_, &framer_) {
IPAddressNumber ip;
CHECK(ParseIPLiteralToNumber("192.0.2.33", &ip));
peer_addr_ = IPEndPoint(ip, 443);
self_addr_ = IPEndPoint(ip, 8435);
// Do null initialization for simple tests.
Initialize();
}
~QuicHttpStreamTest() {
for (size_t i = 0; i < writes_.size(); i++) {
delete writes_[i].packet;
}
}
// Adds a packet to the list of expected writes.
void AddWrite(IoMode mode, QuicEncryptedPacket* packet) {
writes_.push_back(PacketToWrite(mode, packet));
}
// Returns the packet to be written at position |pos|.
QuicEncryptedPacket* GetWrite(size_t pos) {
return writes_[pos].packet;
}
bool AtEof() {
return socket_data_->at_read_eof() && socket_data_->at_write_eof();
}
void ProcessPacket(const QuicEncryptedPacket& packet) {
connection_->ProcessUdpPacket(self_addr_, peer_addr_, packet);
}
// Configures the test fixture to use the list of expected writes.
void Initialize() {
mock_writes_.reset(new MockWrite[writes_.size()]);
for (size_t i = 0; i < writes_.size(); i++) {
mock_writes_[i] = MockWrite(writes_[i].mode,
writes_[i].packet->data(),
writes_[i].packet->length());
};
socket_data_.reset(new StaticSocketDataProvider(NULL, 0, mock_writes_.get(),
writes_.size()));
socket_.reset(new MockUDPClientSocket(socket_data_.get(),
net_log_.net_log()));
socket_->Connect(peer_addr_);
runner_ = new TestTaskRunner(&clock_);
scheduler_ = new MockScheduler();
EXPECT_CALL(*scheduler_, TimeUntilSend(_)).
WillRepeatedly(testing::Return(QuicTime::Delta()));
connection_ = new TestQuicConnection(
guid_, peer_addr_, new QuicConnectionHelper(
runner_.get(), &clock_, socket_.get()));
connection_->set_visitor(&visitor_);
connection_->SetScheduler(scheduler_);
session_.reset(new QuicClientSession(connection_));
CryptoHandshakeMessage message;
message.tag = kSHLO;
session_->GetCryptoStream()->OnHandshakeMessage(message);
EXPECT_TRUE(session_->IsCryptoHandshakeComplete());
stream_.reset(new QuicHttpStream(session_->CreateOutgoingReliableStream()));
}
// Returns a newly created packet to send kData on stream 1.
QuicEncryptedPacket* ConstructDataPacket(
QuicPacketSequenceNumber sequence_number,
bool fin,
QuicStreamOffset offset,
base::StringPiece data) {
InitializeHeader(sequence_number);
QuicStreamFrame frame(3, fin, offset, data);
return ConstructPacket(header_, QuicFrame(&frame));
}
// Returns a newly created packet to send ack data.
QuicEncryptedPacket* ConstructAckPacket(
QuicPacketSequenceNumber sequence_number,
QuicPacketSequenceNumber largest_received,
QuicPacketSequenceNumber least_unacked) {
InitializeHeader(sequence_number);
QuicAckFrame ack(largest_received, QuicTime(), least_unacked);
ack.congestion_info.type = kFixRate;
ack.congestion_info.fix_rate.bitrate_in_bytes_per_second = 100000;
return ConstructPacket(header_, QuicFrame(&ack));
}
// Returns a newly created packet to send a connection close frame.
QuicEncryptedPacket* ConstructClosePacket(
QuicPacketSequenceNumber sequence_number,
bool with_congestion_info) {
InitializeHeader(sequence_number);
QuicFrames frames;
QuicAckFrame ack(0, QuicTime(), 0);
if (with_congestion_info) {
ack.congestion_info.type = kFixRate;
ack.congestion_info.fix_rate.bitrate_in_bytes_per_second = 100000;
} else {
ack.congestion_info.type = kNone;
}
QuicConnectionCloseFrame close;
close.error_code = QUIC_CONNECTION_TIMED_OUT;
close.ack_frame = ack;
return ConstructPacket(header_, QuicFrame(&close));
}
BoundNetLog net_log_;
MockScheduler* scheduler_;
scoped_refptr<TestTaskRunner> runner_;
scoped_array<MockWrite> mock_writes_;
MockClock clock_;
TestQuicConnection* connection_;
testing::StrictMock<MockConnectionVisitor> visitor_;
scoped_ptr<QuicHttpStream> stream_;
scoped_ptr<QuicClientSession> session_;
TestCompletionCallback callback_;
HttpRequestInfo request_;
HttpRequestHeaders headers_;
HttpResponseInfo response_;
scoped_refptr<IOBufferWithSize> read_buffer_;
private:
void InitializeHeader(QuicPacketSequenceNumber sequence_number) {
header_.guid = guid_;
header_.packet_sequence_number = sequence_number;
header_.flags = PACKET_FLAGS_NONE;
header_.fec_group = 0;
}
QuicEncryptedPacket* ConstructPacket(const QuicPacketHeader& header,
const QuicFrame& frame) {
QuicFrames frames;
frames.push_back(frame);
QuicPacket* packet;
framer_.ConstructFrameDataPacket(header_, frames, &packet);
QuicEncryptedPacket* encrypted = framer_.EncryptPacket(*packet);
delete packet;
return encrypted;
}
const QuicGuid guid_;
QuicFramer framer_;
IPEndPoint self_addr_;
IPEndPoint peer_addr_;
QuicPacketCreator creator_;
QuicPacketHeader header_;
scoped_ptr<MockUDPClientSocket> socket_;
scoped_ptr<StaticSocketDataProvider> socket_data_;
std::vector<PacketToWrite> writes_;
};
TEST_F(QuicHttpStreamTest, RenewStreamForAuth) {
EXPECT_EQ(NULL, stream_->RenewStreamForAuth());
}
TEST_F(QuicHttpStreamTest, CanFindEndOfResponse) {
EXPECT_TRUE(stream_->CanFindEndOfResponse());
}
TEST_F(QuicHttpStreamTest, IsMoreDataBuffered) {
EXPECT_FALSE(stream_->IsMoreDataBuffered());
}
TEST_F(QuicHttpStreamTest, IsConnectionReusable) {
EXPECT_FALSE(stream_->IsConnectionReusable());
}
TEST_F(QuicHttpStreamTest, GetRequest) {
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0,
"GET / HTTP/1.1\r\n\r\n"));
AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 0));
Initialize();
request_.method = "GET";
request_.url = GURL("http://www.google.com/");
EXPECT_EQ(OK, stream_->InitializeStream(&request_, net_log_,
callback_.callback()));
EXPECT_EQ(OK, stream_->SendRequest(headers_, &response_,
callback_.callback()));
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack the request.
scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 0));
ProcessPacket(*ack);
EXPECT_EQ(ERR_IO_PENDING,
stream_->ReadResponseHeaders(callback_.callback()));
// Send the response without a body.
const char kResponseHeaders[] = "HTTP/1.1 404 OK\r\n"
"Content-Type: text/plain\r\n\r\n";
scoped_ptr<QuicEncryptedPacket> resp(
ConstructDataPacket(2, kFin, 0, kResponseHeaders));
ProcessPacket(*resp);
// Now that the headers have been processed, the callback will return.
EXPECT_EQ(OK, callback_.WaitForResult());
ASSERT_TRUE(response_.headers != NULL);
EXPECT_EQ(404, response_.headers->response_code());
EXPECT_TRUE(response_.headers->HasHeaderValue("Content-Type", "text/plain"));
// There is no body, so this should return immediately.
EXPECT_EQ(0, stream_->ReadResponseBody(read_buffer_.get(),
read_buffer_->size(),
callback_.callback()));
EXPECT_TRUE(stream_->IsResponseBodyComplete());
EXPECT_TRUE(AtEof());
}
TEST_F(QuicHttpStreamTest, GetRequestFullResponseInSinglePacket) {
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kFin, 0,
"GET / HTTP/1.1\r\n\r\n"));
AddWrite(SYNCHRONOUS, ConstructAckPacket(2, 2, 0));
Initialize();
request_.method = "GET";
request_.url = GURL("http://www.google.com/");
EXPECT_EQ(OK, stream_->InitializeStream(&request_, net_log_,
callback_.callback()));
EXPECT_EQ(OK, stream_->SendRequest(headers_, &response_,
callback_.callback()));
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack the request.
scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 1, 0));
ProcessPacket(*ack);
EXPECT_EQ(ERR_IO_PENDING,
stream_->ReadResponseHeaders(callback_.callback()));
// Send the response with a body.
const char kResponseHeaders[] = "HTTP/1.1 404 OK\r\n"
"Content-Type: text/plain\r\n\r\nhello world!";
scoped_ptr<QuicEncryptedPacket> resp(
ConstructDataPacket(2, kFin, 0, kResponseHeaders));
ProcessPacket(*resp);
// Now that the headers have been processed, the callback will return.
EXPECT_EQ(OK, callback_.WaitForResult());
ASSERT_TRUE(response_.headers != NULL);
EXPECT_EQ(404, response_.headers->response_code());
EXPECT_TRUE(response_.headers->HasHeaderValue("Content-Type", "text/plain"));
// There is no body, so this should return immediately.
// Since the body has already arrived, this should return immediately.
EXPECT_EQ(12, stream_->ReadResponseBody(read_buffer_.get(),
read_buffer_->size(),
callback_.callback()));
EXPECT_TRUE(stream_->IsResponseBodyComplete());
EXPECT_TRUE(AtEof());
}
TEST_F(QuicHttpStreamTest, SendPostRequest) {
const char kRequestData[] = "POST / HTTP/1.1\r\n\r\n";
AddWrite(SYNCHRONOUS, ConstructDataPacket(1, kNoFin, 0, kRequestData));
AddWrite(SYNCHRONOUS, ConstructDataPacket(2, kFin, strlen(kRequestData),
kUploadData));
AddWrite(SYNCHRONOUS, ConstructAckPacket(3, 2, 0));
AddWrite(SYNCHRONOUS, ConstructAckPacket(4, 3, 0));
Initialize();
UploadData* upload_data = new UploadData();
upload_data->AppendBytes(kUploadData, strlen(kUploadData));
UploadDataStream upload_data_stream(upload_data);
request_.method = "POST";
request_.url = GURL("http://www.google.com/");
request_.upload_data_stream = &upload_data_stream;
ASSERT_EQ(OK, request_.upload_data_stream->InitSync());
EXPECT_EQ(OK, stream_->InitializeStream(&request_, net_log_,
callback_.callback()));
EXPECT_EQ(OK, stream_->SendRequest(headers_, &response_,
callback_.callback()));
EXPECT_EQ(&response_, stream_->GetResponseInfo());
// Ack both packets in the request.
scoped_ptr<QuicEncryptedPacket> ack(ConstructAckPacket(1, 2, 1));
ProcessPacket(*ack);
// Send the response headers (but not the body).
const char kResponseHeaders[] = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n\r\n";
scoped_ptr<QuicEncryptedPacket> resp(
ConstructDataPacket(2, kNoFin, 0, kResponseHeaders));
ProcessPacket(*resp);
// Since the headers have already arrived, this should return immediately.
EXPECT_EQ(OK, stream_->ReadResponseHeaders(callback_.callback()));
ASSERT_TRUE(response_.headers != NULL);
EXPECT_EQ(200, response_.headers->response_code());
EXPECT_TRUE(response_.headers->HasHeaderValue("Content-Type", "text/plain"));
// Send the response body.
const char kResponseBody[] = "Hello world!";
scoped_ptr<QuicEncryptedPacket> resp_body(
ConstructDataPacket(3, kFin, strlen(kResponseHeaders), kResponseBody));
ProcessPacket(*resp_body);
// Since the body has already arrived, this should return immediately.
EXPECT_EQ(static_cast<int>(strlen(kResponseBody)),
stream_->ReadResponseBody(read_buffer_.get(), read_buffer_->size(),
callback_.callback()));
EXPECT_TRUE(stream_->IsResponseBodyComplete());
EXPECT_TRUE(AtEof());
}
} // namespace test
} // namespace net
......@@ -261,7 +261,7 @@ struct NET_EXPORT_PRIVATE QuicAckFrame {
struct NET_EXPORT_PRIVATE QuicRstStreamFrame {
QuicRstStreamFrame() {}
QuicRstStreamFrame(QuicStreamId stream_id, uint64 offset,
QuicErrorCode error_code)
QuicErrorCode error_code)
: stream_id(stream_id), offset(offset), error_code(error_code) {
DCHECK_LE(error_code, std::numeric_limits<uint8>::max());
}
......
......@@ -5,19 +5,26 @@
#include "net/quic/quic_reliable_client_stream.h"
#include "net/base/net_errors.h"
#include "net/quic/quic_session.h"
namespace net {
QuicReliableClientStream::QuicReliableClientStream(QuicStreamId id,
QuicSession* session)
: ReliableQuicStream(id, session) {
: ReliableQuicStream(id, session),
delegate_(NULL) {
}
QuicReliableClientStream::~QuicReliableClientStream() {
if (delegate_) {
delegate_->OnClose(error());
}
}
uint32 QuicReliableClientStream::ProcessData(const char* data,
uint32 data_len) {
// TODO(rch): buffer data if we don't have a delegate.
DCHECK(delegate_);
int rv = delegate_->OnDataReceived(data, data_len);
if (rv != OK) {
DLOG(ERROR) << "Delegate refused data, rv: " << rv;
......@@ -28,11 +35,15 @@ uint32 QuicReliableClientStream::ProcessData(const char* data,
}
void QuicReliableClientStream::TerminateFromPeer(bool half_close) {
delegate_->OnClose(error());
if (delegate_) {
delegate_->OnClose(error());
delegate_ = NULL;
}
}
void QuicReliableClientStream::SetDelegate(
QuicReliableClientStream::Delegate* delegate) {
DCHECK((!delegate_ && delegate) || (delegate_ && !delegate));
delegate_ = delegate;
}
......
......@@ -7,6 +7,7 @@
#ifndef NET_QUIC_QUIC_RELIABLE_CLIENT_STREAM_H_
#define NET_QUIC_QUIC_RELIABLE_CLIENT_STREAM_H_
#include "net/base/ip_endpoint.h"
#include "net/base/upload_data_stream.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"
......@@ -17,10 +18,12 @@ namespace net {
class QuicClientSession;
// A client-initiated ReliableQuicStream. Instances of this class
// are owned by the QuicClientSession which created them.
class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream {
public:
// Delegate handles protocol specific behavior of a quic stream.
class Delegate {
class NET_EXPORT_PRIVATE Delegate {
public:
Delegate() {}
......@@ -57,6 +60,7 @@ class NET_EXPORT_PRIVATE QuicReliableClientStream : public ReliableQuicStream {
// ReliableQuicStream
virtual uint32 ProcessData(const char* data, uint32 data_len) OVERRIDE;
virtual void TerminateFromPeer(bool half_close) OVERRIDE;
using ReliableQuicStream::WriteData;
// Set new |delegate|. |delegate| must not be NULL.
// If this stream has already received data, OnDataReceived() will be
......
......@@ -52,6 +52,7 @@ TEST_F(QuicReliableClientStreamTest, TerminateFromPeer) {
TEST_F(QuicReliableClientStreamTest, ProcessData) {
const char data[] = "hello world!";
EXPECT_CALL(delegate_, OnDataReceived(StrEq(data), arraysize(data)));
EXPECT_CALL(delegate_, OnClose(QUIC_NO_ERROR));
EXPECT_EQ(arraysize(data), stream_.ProcessData(data, arraysize(data)));
}
......@@ -61,6 +62,8 @@ TEST_F(QuicReliableClientStreamTest, ProcessDataWithError) {
EXPECT_CALL(delegate_,
OnDataReceived(StrEq(data),
arraysize(data))).WillOnce(Return(ERR_UNEXPECTED));
EXPECT_CALL(delegate_, OnClose(QUIC_BAD_APPLICATION_PAYLOAD));
EXPECT_EQ(0u, stream_.ProcessData(data, arraysize(data)));
}
......
......@@ -90,18 +90,22 @@ bool ReliableQuicStream::HasBytesToRead() const {
return sequencer_.HasBytesToRead();
}
const IPEndPoint& ReliableQuicStream::GetPeerAddress() const {
return session_->peer_address();
}
int ReliableQuicStream::WriteData(StringPiece data, bool fin) {
if (write_side_closed_) {
DLOG(ERROR) << "Attempt to write when the write side is closed";
return 0;
}
session()->WriteData(id(), data, offset_, fin);
int rv = session()->WriteData(id(), data, offset_, fin);
offset_ += data.length();
if (fin) {
CloseWriteSide();
}
return data.length();
return rv;
}
void ReliableQuicStream::CloseReadSide() {
......@@ -112,6 +116,7 @@ void ReliableQuicStream::CloseReadSide() {
read_side_closed_ = true;
if (write_side_closed_) {
DLOG(INFO) << "Closing stream: " << id();
session_->CloseStream(id());
}
}
......@@ -124,6 +129,7 @@ void ReliableQuicStream::CloseWriteSide() {
write_side_closed_ = true;
if (read_side_closed_) {
DLOG(INFO) << "Closing stream: " << id();
session_->CloseStream(id());
}
}
......
......@@ -11,6 +11,7 @@
namespace net {
class IPEndPoint;
class QuicSession;
// All this does right now is send data to subclasses via the sequencer.
......@@ -55,6 +56,8 @@ class NET_EXPORT_PRIVATE ReliableQuicStream {
bool read_side_closed() const { return read_side_closed_; }
bool write_side_closed() const { return write_side_closed_; }
const IPEndPoint& GetPeerAddress() const;
protected:
virtual int WriteData(base::StringPiece data, bool fin);
// Close the read side of the socket. Further frames will not be accepted.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment