Commit 1f4179d3 authored by akalin@chromium.org's avatar akalin@chromium.org

[SPDY] Refactor SpdySession's write queue

This is in preparation for replacing the various IOBuffers used for reads/writes
with a single SpdyBuffer class.

Replace the priority queue of SpdyIOBufferProducers with a SpdyWriteQueue object,
which is an an array of FIFO queues binned by priority.
The priority queue was looking only at priority and so was not guaranteeing
FIFO behavior among producers with the same priority.

Remove the frame queue in SpdyStream and instead have it use the session's
write queue directly.

Remove unused fields from SpdyIOBuffer and clean it up.

Propagate and handle errors from SpdyCredentialBuilder::Build.

Rename SpdyIOBufferProducer to SpdyFrameProducer, have it return a SpdyFrame,
clean up its interface, and move the stream-activating logic out of it.

Replace uses of std::list with std::deque.

Steamline logic in SpdySession that deals with the write queue.

Convert some raw pointers to scoped_ptr<>.

BUG=176582

Review URL: https://codereview.chromium.org/13009012

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192975 0039d316-1c4b-4281-b951-d872f2087c98
parent 4cc3da80
......@@ -839,6 +839,8 @@
'spdy/spdy_credential_state.h',
'spdy/spdy_frame_builder.cc',
'spdy/spdy_frame_builder.h',
'spdy/spdy_frame_producer.cc',
'spdy/spdy_frame_producer.h',
'spdy/spdy_frame_reader.cc',
'spdy/spdy_frame_reader.h',
'spdy/spdy_framer.cc',
......@@ -864,6 +866,8 @@
'spdy/spdy_stream.h',
'spdy/spdy_websocket_stream.cc',
'spdy/spdy_websocket_stream.h',
'spdy/spdy_write_queue.cc',
'spdy/spdy_write_queue.h',
'ssl/client_cert_store.h',
'ssl/client_cert_store_impl.h',
'ssl/client_cert_store_impl_mac.cc',
......@@ -1663,6 +1667,7 @@
'spdy/spdy_websocket_test_util_spdy2.h',
'spdy/spdy_websocket_test_util_spdy3.cc',
'spdy/spdy_websocket_test_util_spdy3.h',
'spdy/spdy_write_queue_unittest.cc',
'ssl/client_cert_store_impl_unittest.cc',
'ssl/default_server_bound_cert_store_unittest.cc',
'ssl/openssl_client_key_store_unittest.cc',
......
// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/spdy/spdy_frame_producer.h"
#include "base/logging.h"
#include "net/spdy/spdy_protocol.h"
namespace net {
SpdyFrameProducer::SpdyFrameProducer() {}
SpdyFrameProducer::~SpdyFrameProducer() {}
SimpleFrameProducer::SimpleFrameProducer(scoped_ptr<SpdyFrame> frame)
: frame_(frame.Pass()) {}
SimpleFrameProducer::~SimpleFrameProducer() {}
scoped_ptr<SpdyFrame> SimpleFrameProducer::ProduceFrame() {
DCHECK(frame_);
return frame_.Pass();
}
} // namespace net
// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef NET_SPDY_SPDY_FRAME_PRODUCER_H_
#define NET_SPDY_SPDY_FRAME_PRODUCER_H_
#include "base/basictypes.h"
#include "base/compiler_specific.h"
#include "base/memory/scoped_ptr.h"
#include "net/base/net_export.h"
namespace net {
class SpdyFrame;
// An object which provides a SpdyFrame for writing. We pass these
// around instead of SpdyFrames since some frames have to be generated
// "just in time".
class NET_EXPORT_PRIVATE SpdyFrameProducer {
public:
SpdyFrameProducer();
// Produces the frame to be written. Will be called at most once.
virtual scoped_ptr<SpdyFrame> ProduceFrame() = 0;
virtual ~SpdyFrameProducer();
private:
DISALLOW_COPY_AND_ASSIGN(SpdyFrameProducer);
};
// A simple wrapper around a single SpdyFrame.
class NET_EXPORT_PRIVATE SimpleFrameProducer : public SpdyFrameProducer {
public:
explicit SimpleFrameProducer(scoped_ptr<SpdyFrame> frame);
virtual ~SimpleFrameProducer();
virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE;
private:
scoped_ptr<SpdyFrame> frame_;
DISALLOW_COPY_AND_ASSIGN(SimpleFrameProducer);
};
} // namespace net
#endif // NET_SPDY_SPDY_FRAME_PRODUCER_H_
......@@ -7,23 +7,13 @@
namespace net {
// static
uint64 SpdyIOBuffer::order_ = 0;
SpdyIOBuffer::SpdyIOBuffer() {}
SpdyIOBuffer::SpdyIOBuffer(
IOBuffer* buffer, int size, RequestPriority priority, SpdyStream* stream)
: buffer_(new DrainableIOBuffer(buffer, size)),
priority_(priority),
position_(++order_),
stream_(stream) {}
SpdyIOBuffer::SpdyIOBuffer() : priority_(HIGHEST), position_(0), stream_(NULL) {
}
SpdyIOBuffer::SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream)
: buffer_(new DrainableIOBuffer(buffer, size)), stream_(stream) {}
SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) {
buffer_ = rhs.buffer_;
priority_ = rhs.priority_;
position_ = rhs.position_;
stream_ = rhs.stream_;
}
......@@ -31,13 +21,16 @@ SpdyIOBuffer::~SpdyIOBuffer() {}
SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) {
buffer_ = rhs.buffer_;
priority_ = rhs.priority_;
position_ = rhs.position_;
stream_ = rhs.stream_;
return *this;
}
void SpdyIOBuffer::release() {
void SpdyIOBuffer::Swap(SpdyIOBuffer* other) {
buffer_.swap(other->buffer_);
stream_.swap(other->stream_);
}
void SpdyIOBuffer::Release() {
buffer_ = NULL;
stream_ = NULL;
}
......
......@@ -8,54 +8,47 @@
#include "base/memory/ref_counted.h"
#include "net/base/io_buffer.h"
#include "net/base/net_export.h"
#include "net/base/request_priority.h"
namespace net {
class SpdyStream;
// A class for managing SPDY IO buffers. These buffers need to be prioritized
// so that the SpdySession sends them in the right order. Further, they need
// to track the SpdyStream which they are associated with so that incremental
// completion of the IO can notify the appropriate stream of completion.
// A class for managing SPDY write buffers. These write buffers need
// to track the SpdyStream which they are associated with so that the
// session can activate the stream lazily and also notify the stream
// on completion of the write.
class NET_EXPORT_PRIVATE SpdyIOBuffer {
public:
SpdyIOBuffer();
// Constructor
// |buffer| is the actual data buffer.
// |size| is the size of the data buffer.
// |priority| is the priority of this buffer.
// |stream| is a pointer to the stream which is managing this buffer.
SpdyIOBuffer(IOBuffer* buffer, int size, RequestPriority priority,
SpdyStream* stream);
// |stream| is a pointer to the stream which is managing this buffer
// (can be NULL if the write is for the session itself).
SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream);
// Declare this instead of using the default so that we avoid needing to
// include spdy_stream.h.
SpdyIOBuffer(const SpdyIOBuffer& rhs);
SpdyIOBuffer();
~SpdyIOBuffer();
// Declare this instead of using the default so that we avoid needing to
// include spdy_stream.h.
SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs);
void Swap(SpdyIOBuffer* other);
void Release();
// Accessors.
DrainableIOBuffer* buffer() const { return buffer_; }
size_t size() const { return buffer_->size(); }
void release();
RequestPriority priority() const { return priority_; }
const scoped_refptr<SpdyStream>& stream() const { return stream_; }
// Comparison operator to support sorting.
bool operator<(const SpdyIOBuffer& other) const {
if (priority_ != other.priority_)
return priority_ < other.priority_;
return position_ > other.position_;
}
private:
scoped_refptr<DrainableIOBuffer> buffer_;
RequestPriority priority_;
uint64 position_;
scoped_refptr<SpdyStream> stream_;
static uint64 order_; // Maintains a FIFO order for equal priorities.
};
} // namespace net
......
This diff is collapsed.
......@@ -6,20 +6,20 @@
#define NET_SPDY_SPDY_SESSION_H_
#include <deque>
#include <list>
#include <map>
#include <queue>
#include <set>
#include <string>
#include "base/basictypes.h"
#include "base/gtest_prod_util.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/time.h"
#include "net/base/io_buffer.h"
#include "net/base/load_states.h"
#include "net/base/net_errors.h"
#include "net/base/net_export.h"
#include "net/base/request_priority.h"
#include "net/socket/client_socket_handle.h"
#include "net/socket/ssl_client_socket.h"
......@@ -30,6 +30,7 @@
#include "net/spdy/spdy_io_buffer.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
#include "net/spdy/spdy_write_queue.h"
#include "net/ssl/ssl_client_cert_type.h"
#include "net/ssl/ssl_config_service.h"
......@@ -173,29 +174,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
FLOW_CONTROL_STREAM_AND_SESSION
};
// Defines an interface for producing SpdyIOBuffers.
class NET_EXPORT_PRIVATE SpdyIOBufferProducer {
public:
SpdyIOBufferProducer() {}
// Returns a newly created SpdyIOBuffer, owned by the caller, or NULL
// if not buffer is ready to be produced.
virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) = 0;
virtual RequestPriority GetPriority() const = 0;
virtual ~SpdyIOBufferProducer() {}
protected:
// Activates |spdy_stream| in |spdy_session|.
static void ActivateStream(SpdySession* spdy_session,
SpdyStream* spdy_stream);
static SpdyIOBuffer* CreateIOBuffer(SpdyFrame* frame,
RequestPriority priority,
SpdyStream* spdy_stream);
};
// Create a new SpdySession.
// |host_port_proxy_pair| is the host/port that this session connects to, and
// the proxy configuration settings that it's using.
......@@ -258,37 +236,41 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// authentication now.
bool VerifyDomainAuthentication(const std::string& domain);
// Records that |stream| has a write available from |producer|.
// |producer| will be owned by this SpdySession.
void SetStreamHasWriteAvailable(SpdyStream* stream,
SpdyIOBufferProducer* producer);
// Pushes the given producer into the write queue for
// |stream|. |stream| is guaranteed to be activated before the
// producer is used to produce its frame.
void EnqueueStreamWrite(SpdyStream* stream,
scoped_ptr<SpdyFrameProducer> producer);
// Send the SYN frame for |stream_id|. This also sends PING message to check
// the status of the connection.
SpdyFrame* CreateSynStream(
// Creates and returns a SYN frame for |stream_id|.
scoped_ptr<SpdyFrame> CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const SpdyHeaderBlock& headers);
// Write a CREDENTIAL frame to the session.
SpdyFrame* CreateCredentialFrame(const std::string& origin,
SSLClientCertType type,
const std::string& key,
const std::string& cert,
RequestPriority priority);
// Write a HEADERS frame to the stream.
SpdyFrame* CreateHeadersFrame(SpdyStreamId stream_id,
const SpdyHeaderBlock& headers,
SpdyControlFlags flags);
// Write a data frame to the stream.
// Used to create and queue a data frame for the given stream.
SpdyFrame* CreateDataFrame(SpdyStreamId stream_id,
net::IOBuffer* data, int len,
SpdyDataFlags flags);
// Tries to create a CREDENTIAL frame. If successful, fills in
// |credential_frame| and returns OK. Returns the error (guaranteed
// to not be ERR_IO_PENDING) otherwise.
int CreateCredentialFrame(const std::string& origin,
SSLClientCertType type,
const std::string& key,
const std::string& cert,
RequestPriority priority,
scoped_ptr<SpdyFrame>* credential_frame);
// Creates and returns a HEADERS frame.
scoped_ptr<SpdyFrame> CreateHeadersFrame(SpdyStreamId stream_id,
const SpdyHeaderBlock& headers,
SpdyControlFlags flags);
// Creates and returns a data frame. May return NULL if stalled by
// flow control.
scoped_ptr<SpdyFrame> CreateDataFrame(SpdyStreamId stream_id,
net::IOBuffer* data,
int len,
SpdyDataFlags flags);
// Close a stream.
void CloseStream(SpdyStreamId stream_id, int status);
......@@ -471,19 +453,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
std::pair<scoped_refptr<SpdyStream>, base::TimeTicks> > PushedStreamMap;
typedef std::set<scoped_refptr<SpdyStream> > CreatedStreamSet;
typedef std::map<SpdyIOBufferProducer*, SpdyStream*> StreamProducerMap;
class SpdyIOBufferProducerCompare {
public:
bool operator() (const SpdyIOBufferProducer* lhs,
const SpdyIOBufferProducer* rhs) const {
return lhs->GetPriority() < rhs->GetPriority();
}
};
typedef std::priority_queue<SpdyIOBufferProducer*,
std::vector<SpdyIOBufferProducer*>,
SpdyIOBufferProducerCompare> WriteQueue;
enum State {
IDLE,
......@@ -566,10 +535,16 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Get a new stream id.
int GetNewStreamId();
// Queue a frame for sending.
// |frame| is the frame to send.
// |priority| is the priority for insertion into the queue.
void QueueFrame(SpdyFrame* frame, RequestPriority priority);
// Pushes the given frame with the given priority into the write
// queue for the session.
void EnqueueSessionWrite(RequestPriority priority,
scoped_ptr<SpdyFrame> frame);
// Puts |producer| associated with |stream| onto the write queue
// with the given priority.
void EnqueueWrite(RequestPriority priority,
scoped_ptr<SpdyFrameProducer> producer,
const scoped_refptr<SpdyStream>& stream);
// Track active streams in the active stream list.
void ActivateStream(SpdyStream* stream);
......@@ -753,13 +728,8 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// Set of all created streams but that have not yet sent any frames.
CreatedStreamSet created_streams_;
// As streams have data to be sent, we put them into the write queue.
WriteQueue write_queue_;
// Mapping from SpdyIOBufferProducers to their corresponding SpdyStream
// so that when a stream is destroyed, we can remove the corresponding
// producer from |write_queue_|.
StreamProducerMap stream_producers_;
// The write queue.
SpdyWriteQueue write_queue_;
// The packet we are currently sending.
bool write_pending_; // Will be true when a write is in progress.
......
......@@ -103,39 +103,6 @@ class SpdySessionSpdy2Test : public PlatformTest {
HostPortProxyPair pair_;
};
// Test the SpdyIOBuffer class.
TEST_F(SpdySessionSpdy2Test, SpdyIOBuffer) {
std::priority_queue<SpdyIOBuffer> queue_;
const size_t kQueueSize = 100;
// Insert items with random priority and increasing buffer size.
for (size_t index = 0; index < kQueueSize; ++index) {
queue_.push(SpdyIOBuffer(
new IOBufferWithSize(index + 1),
index + 1,
static_cast<RequestPriority>(rand() % NUM_PRIORITIES),
NULL));
}
EXPECT_EQ(kQueueSize, queue_.size());
// Verify items come out with decreasing priority or FIFO order.
RequestPriority last_priority = NUM_PRIORITIES;
size_t last_size = 0;
for (size_t index = 0; index < kQueueSize; ++index) {
SpdyIOBuffer buffer = queue_.top();
EXPECT_LE(buffer.priority(), last_priority);
if (buffer.priority() < last_priority)
last_size = 0;
EXPECT_LT(last_size, buffer.size());
last_priority = buffer.priority();
last_size = buffer.size();
queue_.pop();
}
EXPECT_EQ(0u, queue_.size());
}
TEST_F(SpdySessionSpdy2Test, GoAway) {
session_deps_.host_resolver->set_synchronous_mode(true);
......
......@@ -120,39 +120,6 @@ class SpdySessionSpdy3Test : public PlatformTest {
HostPortProxyPair pair_;
};
// Test the SpdyIOBuffer class.
TEST_F(SpdySessionSpdy3Test, SpdyIOBuffer) {
std::priority_queue<SpdyIOBuffer> queue_;
const size_t kQueueSize = 100;
// Insert items with random priority and increasing buffer size.
for (size_t index = 0; index < kQueueSize; ++index) {
queue_.push(SpdyIOBuffer(
new IOBufferWithSize(index + 1),
index + 1,
static_cast<RequestPriority>(rand() % NUM_PRIORITIES),
NULL));
}
EXPECT_EQ(kQueueSize, queue_.size());
// Verify items come out with decreasing priority or FIFO order.
RequestPriority last_priority = NUM_PRIORITIES;
size_t last_size = 0;
for (size_t index = 0; index < kQueueSize; ++index) {
SpdyIOBuffer buffer = queue_.top();
EXPECT_LE(buffer.priority(), last_priority);
if (buffer.priority() < last_priority)
last_size = 0;
EXPECT_LT(last_size, buffer.size());
last_priority = buffer.priority();
last_size = buffer.size();
queue_.pop();
}
EXPECT_EQ(0u, queue_.size());
}
TEST_F(SpdySessionSpdy3Test, GoAway) {
session_deps_.host_resolver->set_synchronous_mode(true);
......
This diff is collapsed.
......@@ -5,7 +5,7 @@
#ifndef NET_SPDY_SPDY_STREAM_H_
#define NET_SPDY_SPDY_STREAM_H_
#include <list>
#include <deque>
#include <string>
#include <vector>
......@@ -102,15 +102,6 @@ class NET_EXPORT_PRIVATE SpdyStream
TYPE_DATA
};
// Structure to contains pending frame information.
typedef struct {
FrameType type;
union {
SpdyHeaderBlock* header_block;
SpdyFrame* data_frame;
};
} PendingFrame;
// SpdyStream constructor
SpdyStream(SpdySession* session,
const std::string& path,
......@@ -299,7 +290,8 @@ class NET_EXPORT_PRIVATE SpdyStream
int GetProtocolVersion() const;
private:
class SpdyStreamIOBufferProducer;
class SynStreamFrameProducer;
class HeaderFrameProducer;
enum State {
STATE_NONE,
......@@ -346,13 +338,14 @@ class NET_EXPORT_PRIVATE SpdyStream
// the MessageLoop to replay all the data that the server has already sent.
void PushedStreamReplayData();
// Informs the SpdySession that this stream has a write available.
void SetHasWriteAvailable();
// Produces the SYN_STREAM frame for the stream. The stream must
// already be activated.
scoped_ptr<SpdyFrame> ProduceSynStreamFrame();
// Returns a newly created SPDY frame owned by the called that contains
// the next frame to be sent by this frame. May return NULL if this
// stream has become stalled on flow control.
scoped_ptr<SpdyFrame> ProduceNextFrame();
// Produce the initial HEADER frame for the stream with the given
// block. The stream must already be activated.
scoped_ptr<SpdyFrame> ProduceHeaderFrame(
scoped_ptr<SpdyHeaderBlock> header_block);
// If the stream is active and stream flow control is turned on,
// called by OnDataReceived (which is in turn called by the session)
......@@ -398,14 +391,13 @@ class NET_EXPORT_PRIVATE SpdyStream
scoped_ptr<SpdyHeaderBlock> response_;
base::Time response_time_;
// An in order list of pending frame data that are going to be sent. HEADERS
// frames are queued as SpdyHeaderBlock structures because these must be
// compressed just before sending. Data frames are queued as SpdyDataFrame.
std::list<PendingFrame> pending_frames_;
// An in order list of sending frame types. It will be used to know which type
// of frame is sent and which callback should be invoked in OnOpen().
std::list<FrameType> waiting_completions_;
// An in order list of sending frame types. Used communicate to the
// delegate which type of frame was sent in DoOpen().
//
// TODO(akalin): We can remove the need for this queue if we add an
// OnFrameSent() callback to SpdyFrameProducer and have the session
// call that instead of SpdyStream::OnWriteComplete().
std::deque<FrameType> waiting_completions_;
State io_state_;
......
// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/spdy/spdy_write_queue.h"
#include <cstddef>
#include "base/logging.h"
#include "net/spdy/spdy_frame_producer.h"
#include "net/spdy/spdy_stream.h"
namespace net {
SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
SpdyWriteQueue::PendingWrite::PendingWrite(
SpdyFrameProducer* frame_producer,
const scoped_refptr<SpdyStream>& stream)
: frame_producer(frame_producer),
stream(stream) {}
SpdyWriteQueue::PendingWrite::~PendingWrite() {}
SpdyWriteQueue::SpdyWriteQueue() {}
SpdyWriteQueue::~SpdyWriteQueue() {
Clear();
}
void SpdyWriteQueue::Enqueue(RequestPriority priority,
scoped_ptr<SpdyFrameProducer> frame_producer,
const scoped_refptr<SpdyStream>& stream) {
if (stream.get()) {
DCHECK_EQ(stream->priority(), priority);
}
queue_[priority].push_back(PendingWrite(frame_producer.release(), stream));
}
bool SpdyWriteQueue::Dequeue(scoped_ptr<SpdyFrameProducer>* frame_producer,
scoped_refptr<SpdyStream>* stream) {
for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
if (!queue_[i].empty()) {
PendingWrite pending_write = queue_[i].front();
queue_[i].pop_front();
frame_producer->reset(pending_write.frame_producer);
*stream = pending_write.stream;
return true;
}
}
return false;
}
void SpdyWriteQueue::RemovePendingWritesForStream(
const scoped_refptr<SpdyStream>& stream) {
DCHECK(stream.get());
if (DCHECK_IS_ON()) {
// |stream| should not have pending writes in a queue not matching
// its priority.
for (int i = 0; i < NUM_PRIORITIES; ++i) {
if (stream->priority() == i)
continue;
for (std::deque<PendingWrite>::const_iterator it = queue_[i].begin();
it != queue_[i].end(); ++it) {
DCHECK_NE(it->stream, stream);
}
}
}
// Do the actual deletion and removal, preserving FIFO-ness.
std::deque<PendingWrite>* queue = &queue_[stream->priority()];
std::deque<PendingWrite>::iterator out_it = queue->begin();
for (std::deque<PendingWrite>::const_iterator it = queue->begin();
it != queue->end(); ++it) {
if (it->stream == stream) {
delete it->frame_producer;
} else {
*out_it = *it;
++out_it;
}
}
queue->erase(out_it, queue->end());
}
void SpdyWriteQueue::Clear() {
for (int i = 0; i < NUM_PRIORITIES; ++i) {
for (std::deque<PendingWrite>::iterator it = queue_[i].begin();
it != queue_[i].end(); ++it) {
delete it->frame_producer;
}
queue_[i].clear();
}
}
} // namespace net
// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef NET_SPDY_SPDY_WRITE_QUEUE_H_
#define NET_SPDY_SPDY_WRITE_QUEUE_H_
#include <deque>
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "net/base/net_export.h"
#include "net/base/request_priority.h"
namespace net {
class SpdyFrameProducer;
class SpdyStream;
// A queue of SpdyFrameProducers to produce frames to write. Ordered
// by priority, and then FIFO.
class NET_EXPORT_PRIVATE SpdyWriteQueue {
public:
SpdyWriteQueue();
~SpdyWriteQueue();
// Enqueues the given frame producer at the given priority
// associated with the given stream, which may be NULL if the frame
// producer is not associated with a stream. If |stream| is
// non-NULL, its priority must be equal to |priority|.
void Enqueue(RequestPriority priority,
scoped_ptr<SpdyFrameProducer> frame_producer,
const scoped_refptr<SpdyStream>& stream);
// Dequeues the frame producer with the highest priority that was
// enqueued the earliest and its associated stream. Returns true and
// fills in |frame_producer| and |stream| if successful --
// otherwise, just returns false.
bool Dequeue(scoped_ptr<SpdyFrameProducer>* frame_producer,
scoped_refptr<SpdyStream>* stream);
// Removes all pending writes for the given stream, which must be
// non-NULL.
void RemovePendingWritesForStream(const scoped_refptr<SpdyStream>& stream);
// Removes all pending writes.
void Clear();
private:
// A struct holding a frame producer and its associated stream.
struct PendingWrite {
// This has to be a raw pointer since we store this in an STL
// container.
SpdyFrameProducer* frame_producer;
scoped_refptr<SpdyStream> stream;
PendingWrite();
PendingWrite(SpdyFrameProducer* frame_producer,
const scoped_refptr<SpdyStream>& stream);
~PendingWrite();
};
// The actual write queue, binned by priority.
std::deque<PendingWrite> queue_[NUM_PRIORITIES];
DISALLOW_COPY_AND_ASSIGN(SpdyWriteQueue);
};
} // namespace net
#endif // NET_SPDY_SPDY_WRITE_QUEUE_H_
// Copyright (c) 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/spdy/spdy_write_queue.h"
#include <cstddef>
#include <cstring>
#include <string>
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/strings/string_number_conversions.h"
#include "net/base/net_log.h"
#include "net/base/request_priority.h"
#include "net/spdy/spdy_frame_producer.h"
#include "net/spdy/spdy_stream.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace net {
namespace {
class SpdyWriteQueueTest : public ::testing::Test {};
// Makes a SpdyFrameProducer producing a frame with the data in the
// given string.
scoped_ptr<SpdyFrameProducer> StringToProducer(const std::string& s) {
scoped_ptr<char[]> data(new char[s.size()]);
std::memcpy(data.get(), s.data(), s.size());
return scoped_ptr<SpdyFrameProducer>(
new SimpleFrameProducer(
scoped_ptr<SpdyFrame>(
new SpdyFrame(data.release(), s.size(), true))));
}
// Makes a SpdyFrameProducer producing a frame with the data in the
// given int (converted to a string).
scoped_ptr<SpdyFrameProducer> IntToProducer(int i) {
return StringToProducer(base::IntToString(i));
}
// Produces a frame with the given producer and returns a copy of its
// data as a string.
std::string ProducerToString(scoped_ptr<SpdyFrameProducer> producer) {
scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
return std::string(frame->data(), frame->size());
}
// Produces a frame with the given producer and returns a copy of its
// data as an int (converted from a string).
int ProducerToInt(scoped_ptr<SpdyFrameProducer> producer) {
int i = 0;
EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i));
return i;
}
// Makes a SpdyStream with the given priority and a NULL SpdySession
// -- be careful to not call any functions that expect the session to
// be there.
SpdyStream* MakeTestStream(RequestPriority priority) {
return new SpdyStream(NULL, "", priority, 0, 0, false, BoundNetLog());
}
// Add some frame producers of different priority. The producers
// should be dequeued in priority order with their associated stream.
TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
SpdyWriteQueue write_queue;
scoped_ptr<SpdyFrameProducer> producer_low = StringToProducer("LOW");
scoped_ptr<SpdyFrameProducer> producer_medium = StringToProducer("MEDIUM");
scoped_ptr<SpdyFrameProducer> producer_highest = StringToProducer("HIGHEST");
// A NULL stream should still work.
scoped_refptr<SpdyStream> stream_low(NULL);
scoped_refptr<SpdyStream> stream_medium(MakeTestStream(MEDIUM));
scoped_refptr<SpdyStream> stream_highest(MakeTestStream(HIGHEST));
write_queue.Enqueue(LOW, producer_low.Pass(), stream_low);
write_queue.Enqueue(MEDIUM, producer_medium.Pass(), stream_medium);
write_queue.Enqueue(HIGHEST, producer_highest.Pass(), stream_highest);
scoped_ptr<SpdyFrameProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ("HIGHEST", ProducerToString(frame_producer.Pass()));
EXPECT_EQ(stream_highest, stream);
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ("MEDIUM", ProducerToString(frame_producer.Pass()));
EXPECT_EQ(stream_medium, stream);
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ("LOW", ProducerToString(frame_producer.Pass()));
EXPECT_EQ(stream_low, stream);
EXPECT_FALSE(write_queue.Dequeue(&frame_producer, &stream));
}
// Add some frame producers with the same priority. The producers
// should be dequeued in FIFO order with their associated stream.
TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
SpdyWriteQueue write_queue;
scoped_ptr<SpdyFrameProducer> producer1 = IntToProducer(1);
scoped_ptr<SpdyFrameProducer> producer2 = IntToProducer(2);
scoped_ptr<SpdyFrameProducer> producer3 = IntToProducer(3);
scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
scoped_refptr<SpdyStream> stream3(MakeTestStream(DEFAULT_PRIORITY));
write_queue.Enqueue(DEFAULT_PRIORITY, producer1.Pass(), stream1);
write_queue.Enqueue(DEFAULT_PRIORITY, producer2.Pass(), stream2);
write_queue.Enqueue(DEFAULT_PRIORITY, producer3.Pass(), stream3);
scoped_ptr<SpdyFrameProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ(1, ProducerToInt(frame_producer.Pass()));
EXPECT_EQ(stream1, stream);
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ(2, ProducerToInt(frame_producer.Pass()));
EXPECT_EQ(stream2, stream);
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ(3, ProducerToInt(frame_producer.Pass()));
EXPECT_EQ(stream3, stream);
EXPECT_FALSE(write_queue.Dequeue(&frame_producer, &stream));
}
// Enqueue a bunch of writes and then call
// RemovePendingWritesForStream() on one of the streams. No dequeued
// write should be for that stream.
TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
SpdyWriteQueue write_queue;
scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
for (int i = 0; i < 100; ++i) {
scoped_refptr<SpdyStream> stream = ((i % 3) == 0) ? stream1 : stream2;
write_queue.Enqueue(DEFAULT_PRIORITY, IntToProducer(i), stream);
}
write_queue.RemovePendingWritesForStream(stream2);
for (int i = 0; i < 100; i += 3) {
scoped_ptr<SpdyFrameProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_producer, &stream));
EXPECT_EQ(i, ProducerToInt(frame_producer.Pass()));
EXPECT_EQ(stream1, stream);
}
scoped_ptr<SpdyFrameProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_producer, &stream));
}
// Enqueue a bunch of writes and then call Clear(). The write queue
// should clean up the memory properly, and Dequeue() should return
// false.
TEST_F(SpdyWriteQueueTest, Clear) {
SpdyWriteQueue write_queue;
for (int i = 0; i < 100; ++i) {
write_queue.Enqueue(DEFAULT_PRIORITY, IntToProducer(i), NULL);
}
write_queue.Clear();
scoped_ptr<SpdyFrameProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_producer, &stream));
}
}
} // namespace net
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment