Commit 5f2dd387 authored by miu's avatar miu Committed by Commit bot

[Cast] Limit frames in flight by duration, and not by number of frames.

FrameSender now decides whether to drop frames based on the media
duration in-flight (instead of the number of frames in-flight).  The
maximum allowed in-flight duration is the target playout delay plus the
one-way network trip time, the latter of which accounts for the time it
takes for an ACK to travel back from the receiver.

ShouldDropNextFrame() still limits by the number of frames in-flight,
but only so that a client cannot flood the FrameSender with frames at a
rate greater than the configured maximum frame rate.  Some burstiness is
allowed to mitigate false-positive detections.  In a typical session,
frames should never be dropped based upon the in-flight count.

This change also alters the design of PacketStorage where, instead of
being a fixed "large enough for anything" size, it only holds the frames
that are in-flight.  This is needed because we no longer limit by the
number of frames in-flight, and there's no way to determine what the
"large enough for anything" size has to be.  It's now the duty of
FrameSender to "cancel out" acknowledged frames, and of RtpSender to
release them from storage.

BUG=404813

Review URL: https://codereview.chromium.org/560223002

Cr-Commit-Position: refs/heads/master@{#295857}
parent 4d5b336e
......@@ -84,14 +84,12 @@ TEST_F(CastTransportHostFilterTest, SimpleMessages) {
FakeSend(new_msg);
media::cast::CastTransportRtpConfig audio_config;
audio_config.stored_frames = 10;
audio_config.ssrc = 1;
audio_config.feedback_ssrc = 2;
CastHostMsg_InitializeAudio init_audio_msg(kChannelId, audio_config);
FakeSend(init_audio_msg);
media::cast::CastTransportRtpConfig video_config;
video_config.stored_frames = 10;
video_config.ssrc = 11;
video_config.feedback_ssrc = 12;
CastHostMsg_InitializeVideo init_video_msg(kChannelId, video_config);
......
......@@ -47,7 +47,6 @@ IPC_STRUCT_TRAITS_BEGIN(media::cast::CastTransportRtpConfig)
IPC_STRUCT_TRAITS_MEMBER(ssrc)
IPC_STRUCT_TRAITS_MEMBER(feedback_ssrc)
IPC_STRUCT_TRAITS_MEMBER(rtp_payload_type)
IPC_STRUCT_TRAITS_MEMBER(stored_frames)
IPC_STRUCT_TRAITS_MEMBER(aes_key)
IPC_STRUCT_TRAITS_MEMBER(aes_iv_mask)
IPC_STRUCT_TRAITS_END()
......
......@@ -8,7 +8,7 @@ namespace media {
namespace cast {
CastTransportRtpConfig::CastTransportRtpConfig()
: ssrc(0), feedback_ssrc(0), rtp_payload_type(0), stored_frames(0) {}
: ssrc(0), feedback_ssrc(0), rtp_payload_type(0) {}
CastTransportRtpConfig::~CastTransportRtpConfig() {}
......
......@@ -41,10 +41,6 @@ struct CastTransportRtpConfig {
// RTP payload type enum: Specifies the type/encoding of frame data.
int rtp_payload_type;
// The number of most-recent frames that must be stored in the transport
// layer, to facilitate re-transmissions.
int stored_frames;
// The AES crypto key and initialization vector. Each of these strings
// contains the data in binary form, of size kAesKeySize. If they are empty
// strings, crypto is not being used.
......
......@@ -110,7 +110,6 @@ class CastTransportSenderImplTest : public ::testing::Test {
rtp_config.ssrc = kVideoSsrc;
rtp_config.feedback_ssrc = 2;
rtp_config.rtp_payload_type = 3;
rtp_config.stored_frames = 10;
transport_sender_->InitializeVideo(rtp_config,
RtcpCastMessageCallback(),
RtcpRttCallback());
......@@ -121,7 +120,6 @@ class CastTransportSenderImplTest : public ::testing::Test {
rtp_config.ssrc = kAudioSsrc;
rtp_config.feedback_ssrc = 3;
rtp_config.rtp_payload_type = 4;
rtp_config.stored_frames = 10;
transport_sender_->InitializeAudio(rtp_config,
RtcpCastMessageCallback(),
RtcpRttCallback());
......
......@@ -4,47 +4,59 @@
#include "media/cast/net/rtp/packet_storage.h"
#include <string>
#include "base/logging.h"
#include "media/cast/cast_defines.h"
namespace media {
namespace cast {
PacketStorage::PacketStorage(size_t stored_frames)
: max_stored_frames_(stored_frames),
first_frame_id_in_list_(0),
last_frame_id_in_list_(0) {
PacketStorage::PacketStorage()
: first_frame_id_in_list_(0),
zombie_count_(0) {
}
PacketStorage::~PacketStorage() {
}
bool PacketStorage::IsValid() const {
return max_stored_frames_ > 0 &&
static_cast<int>(max_stored_frames_) <= kMaxUnackedFrames;
}
size_t PacketStorage::GetNumberOfStoredFrames() const {
return frames_.size();
return frames_.size() - zombie_count_;
}
void PacketStorage::StoreFrame(uint32 frame_id,
const SendPacketVector& packets) {
if (packets.empty()) {
NOTREACHED();
return;
}
if (frames_.empty()) {
first_frame_id_in_list_ = frame_id;
} else {
// Make sure frame IDs are consecutive.
DCHECK_EQ(last_frame_id_in_list_ + 1, frame_id);
DCHECK_EQ(first_frame_id_in_list_ + static_cast<uint32>(frames_.size()),
frame_id);
// Make sure we aren't being asked to store more frames than the system's
// design limit.
DCHECK_LT(frames_.size(), static_cast<size_t>(kMaxUnackedFrames));
}
// Save new frame to the end of the list.
last_frame_id_in_list_ = frame_id;
frames_.push_back(packets);
}
void PacketStorage::ReleaseFrame(uint32 frame_id) {
const uint32 offset = frame_id - first_frame_id_in_list_;
if (static_cast<int32>(offset) < 0 || offset >= frames_.size() ||
frames_[offset].empty()) {
return;
}
frames_[offset].clear();
++zombie_count_;
// Evict the oldest frame if the list is too long.
if (frames_.size() > max_stored_frames_) {
while (!frames_.empty() && frames_.front().empty()) {
DCHECK_GT(zombie_count_, 0u);
--zombie_count_;
frames_.pop_front();
++first_frame_id_in_list_;
}
......@@ -57,7 +69,8 @@ const SendPacketVector* PacketStorage::GetFrame8(uint8 frame_id_8bits) const {
index_8bits = frame_id_8bits - index_8bits;
if (index_8bits >= frames_.size())
return NULL;
return &(frames_[index_8bits]);
const SendPacketVector& packets = frames_[index_8bits];
return packets.empty() ? NULL : &packets;
}
} // namespace cast
......
......@@ -6,37 +6,24 @@
#define MEDIA_CAST_NET_RTP_SENDER_PACKET_STORAGE_PACKET_STORAGE_H_
#include <deque>
#include <list>
#include <map>
#include <vector>
#include "base/basictypes.h"
#include "base/memory/linked_ptr.h"
#include "base/memory/scoped_ptr.h"
#include "base/time/tick_clock.h"
#include "base/time/time.h"
#include "media/cast/net/cast_transport_config.h"
#include "media/cast/net/cast_transport_defines.h"
#include "media/cast/net/pacing/paced_sender.h"
namespace media {
namespace cast {
// Stores a list of frames. Each frame consists a list of packets.
typedef std::deque<SendPacketVector> FrameQueue;
class PacketStorage {
public:
explicit PacketStorage(size_t stored_frames);
PacketStorage();
virtual ~PacketStorage();
// Returns true if this class is configured correctly.
// (stored frames > 0 && stored_frames < kMaxStoredFrames)
bool IsValid() const;
// Store all of the packets for a frame.
void StoreFrame(uint32 frame_id, const SendPacketVector& packets);
// Release all of the packets for a frame.
void ReleaseFrame(uint32 frame_id);
// Returns a list of packets for a frame indexed by a 8-bits ID.
// It is the lowest 8 bits of a frame ID.
// Returns NULL if the frame cannot be found.
......@@ -46,10 +33,12 @@ class PacketStorage {
size_t GetNumberOfStoredFrames() const;
private:
const size_t max_stored_frames_;
FrameQueue frames_;
std::deque<SendPacketVector> frames_;
uint32 first_frame_id_in_list_;
uint32 last_frame_id_in_list_;
// The number of frames whose packets have been released, but the entry in the
// |frames_| queue has not yet been popped.
size_t zombie_count_;
DISALLOW_COPY_AND_ASSIGN(PacketStorage);
};
......
......@@ -6,16 +6,18 @@
#include <stdint.h>
#include <algorithm>
#include <vector>
#include "base/test/simple_test_tick_clock.h"
#include "base/time/time.h"
#include "media/cast/cast_defines.h"
#include "testing/gmock/include/gmock/gmock.h"
namespace media {
namespace cast {
static size_t kStoredFrames = 10;
static const size_t kStoredFrames = 10;
// Generate |number_of_frames| and store into |*storage|.
// First frame has 1 packet, second frame has 2 packets, etc.
......@@ -41,20 +43,22 @@ static void StoreFrames(size_t number_of_frames,
}
TEST(PacketStorageTest, NumberOfStoredFrames) {
PacketStorage storage(kStoredFrames);
PacketStorage storage;
uint32 frame_id = 0;
frame_id = ~frame_id; // The maximum value of uint32.
StoreFrames(200, frame_id, &storage);
EXPECT_EQ(kStoredFrames, storage.GetNumberOfStoredFrames());
StoreFrames(kMaxUnackedFrames / 2, frame_id, &storage);
EXPECT_EQ(static_cast<size_t>(kMaxUnackedFrames / 2),
storage.GetNumberOfStoredFrames());
}
TEST(PacketStorageTest, GetFrameWrapAround8bits) {
PacketStorage storage(kStoredFrames);
PacketStorage storage;
const uint32 kFirstFrameId = 250;
StoreFrames(kStoredFrames, kFirstFrameId, &storage);
EXPECT_EQ(kStoredFrames, storage.GetNumberOfStoredFrames());
EXPECT_EQ(std::min<size_t>(kMaxUnackedFrames, kStoredFrames),
storage.GetNumberOfStoredFrames());
// Expect we get the correct frames by looking at the number of
// packets.
......@@ -67,12 +71,13 @@ TEST(PacketStorageTest, GetFrameWrapAround8bits) {
}
TEST(PacketStorageTest, GetFrameWrapAround32bits) {
PacketStorage storage(kStoredFrames);
PacketStorage storage;
// First frame ID is close to the maximum value of uint32.
uint32 first_frame_id = 0xffffffff - 5;
StoreFrames(kStoredFrames, first_frame_id, &storage);
EXPECT_EQ(kStoredFrames, storage.GetNumberOfStoredFrames());
EXPECT_EQ(std::min<size_t>(kMaxUnackedFrames, kStoredFrames),
storage.GetNumberOfStoredFrames());
// Expect we get the correct frames by looking at the number of
// packets.
......@@ -84,29 +89,38 @@ TEST(PacketStorageTest, GetFrameWrapAround32bits) {
}
}
TEST(PacketStorageTest, GetFrameTooOld) {
PacketStorage storage(kStoredFrames);
TEST(PacketStorageTest, FramesReleased) {
PacketStorage storage;
// First frame ID is close to the maximum value of uint32.
uint32 first_frame_id = 0xffffffff - 5;
const uint32 kFirstFrameId = 0;
StoreFrames(5, kFirstFrameId, &storage);
EXPECT_EQ(std::min<size_t>(kMaxUnackedFrames, 5),
storage.GetNumberOfStoredFrames());
// Store two times the capacity.
StoreFrames(2 * kStoredFrames, first_frame_id, &storage);
EXPECT_EQ(kStoredFrames, storage.GetNumberOfStoredFrames());
uint32 frame_id = first_frame_id;
// Old frames are evicted.
for (size_t i = 0; i < kStoredFrames; ++i) {
EXPECT_FALSE(storage.GetFrame8(frame_id));
++frame_id;
}
// Check recent frames are there.
for (size_t i = 0; i < kStoredFrames; ++i) {
ASSERT_TRUE(storage.GetFrame8(frame_id));
EXPECT_EQ(kStoredFrames + i + 1,
storage.GetFrame8(frame_id)->size());
++frame_id;
for (uint32 frame_id = kFirstFrameId; frame_id < kFirstFrameId + 5;
++frame_id) {
EXPECT_TRUE(storage.GetFrame8(frame_id));
}
storage.ReleaseFrame(kFirstFrameId + 2);
EXPECT_EQ(4u, storage.GetNumberOfStoredFrames());
EXPECT_FALSE(storage.GetFrame8(kFirstFrameId + 2));
storage.ReleaseFrame(kFirstFrameId + 0);
EXPECT_EQ(3u, storage.GetNumberOfStoredFrames());
EXPECT_FALSE(storage.GetFrame8(kFirstFrameId + 0));
storage.ReleaseFrame(kFirstFrameId + 3);
EXPECT_EQ(2u, storage.GetNumberOfStoredFrames());
EXPECT_FALSE(storage.GetFrame8(kFirstFrameId + 3));
storage.ReleaseFrame(kFirstFrameId + 4);
EXPECT_EQ(1u, storage.GetNumberOfStoredFrames());
EXPECT_FALSE(storage.GetFrame8(kFirstFrameId + 4));
storage.ReleaseFrame(kFirstFrameId + 1);
EXPECT_EQ(0u, storage.GetNumberOfStoredFrames());
EXPECT_FALSE(storage.GetFrame8(kFirstFrameId + 1));
}
} // namespace cast
......
......@@ -102,9 +102,7 @@ class TestRtpPacketTransport : public PacketSender {
class RtpPacketizerTest : public ::testing::Test {
protected:
RtpPacketizerTest()
: task_runner_(new test::FakeSingleThreadTaskRunner(&testing_clock_)),
video_frame_(),
packet_storage_(200) {
: task_runner_(new test::FakeSingleThreadTaskRunner(&testing_clock_)) {
config_.sequence_number = kSeqNum;
config_.ssrc = kSsrc;
config_.payload_type = kPayload;
......
......@@ -41,25 +41,22 @@ RtpSender::RtpSender(
RtpSender::~RtpSender() {}
bool RtpSender::Initialize(const CastTransportRtpConfig& config) {
storage_.reset(new PacketStorage(config.stored_frames));
if (!storage_->IsValid()) {
return false;
}
config_.ssrc = config.ssrc;
config_.payload_type = config.rtp_payload_type;
packetizer_.reset(new RtpPacketizer(transport_, storage_.get(), config_));
packetizer_.reset(new RtpPacketizer(transport_, &storage_, config_));
return true;
}
void RtpSender::SendFrame(const EncodedFrame& frame) {
DCHECK(packetizer_);
packetizer_->SendFrameAsPackets(frame);
LOG_IF(DFATAL, storage_.GetNumberOfStoredFrames() > kMaxUnackedFrames)
<< "Possible bug: Frames are not being actively released from storage.";
}
void RtpSender::ResendPackets(
const MissingFramesAndPacketsMap& missing_frames_and_packets,
bool cancel_rtx_if_not_in_list, const DedupInfo& dedup_info) {
DCHECK(storage_);
// Iterate over all frames in the list.
for (MissingFramesAndPacketsMap::const_iterator it =
missing_frames_and_packets.begin();
......@@ -76,7 +73,7 @@ void RtpSender::ResendPackets(
bool resend_last = missing_packet_set.find(kRtcpCastLastPacket) !=
missing_packet_set.end();
const SendPacketVector* stored_packets = storage_->GetFrame8(frame_id);
const SendPacketVector* stored_packets = storage_.GetFrame8(frame_id);
if (!stored_packets)
continue;
......@@ -119,13 +116,14 @@ void RtpSender::ResendPackets(
void RtpSender::CancelSendingFrames(const std::vector<uint32>& frame_ids) {
for (std::vector<uint32>::const_iterator i = frame_ids.begin();
i != frame_ids.end(); ++i) {
const SendPacketVector* stored_packets = storage_->GetFrame8(*i & 0xFF);
const SendPacketVector* stored_packets = storage_.GetFrame8(*i & 0xFF);
if (!stored_packets)
continue;
for (SendPacketVector::const_iterator j = stored_packets->begin();
j != stored_packets->end(); ++j) {
transport_->CancelSendingPacket(j->first);
}
storage_.ReleaseFrame(*i);
}
}
......@@ -157,7 +155,7 @@ void RtpSender::UpdateSequenceNumber(Packet* packet) {
}
int64 RtpSender::GetLastByteSentForFrame(uint32 frame_id) {
const SendPacketVector* stored_packets = storage_->GetFrame8(frame_id & 0xFF);
const SendPacketVector* stored_packets = storage_.GetFrame8(frame_id & 0xFF);
if (!stored_packets)
return 0;
PacketKey last_packet_key = stored_packets->rbegin()->first;
......
......@@ -71,8 +71,8 @@ class RtpSender {
base::TickClock* clock_; // Not owned by this class.
RtpPacketizerConfig config_;
PacketStorage storage_;
scoped_ptr<RtpPacketizer> packetizer_;
scoped_ptr<PacketStorage> storage_;
PacedSender* const transport_;
scoped_refptr<base::SingleThreadTaskRunner> transport_task_runner_;
......
......@@ -32,7 +32,7 @@ AudioSender::AudioSender(scoped_refptr<CastEnvironment> cast_environment,
base::TimeDelta::FromMilliseconds(audio_config.rtcp_interval),
audio_config.frequency,
audio_config.ssrc,
kAudioFrameRate * 2.0, // We lie to increase max outstanding frames.
kAudioFrameRate,
audio_config.min_playout_delay,
audio_config.max_playout_delay,
NewFixedCongestionControl(audio_config.bitrate)),
......@@ -62,11 +62,6 @@ AudioSender::AudioSender(scoped_refptr<CastEnvironment> cast_environment,
transport_config.ssrc = audio_config.ssrc;
transport_config.feedback_ssrc = audio_config.incoming_feedback_ssrc;
transport_config.rtp_payload_type = audio_config.rtp_payload_type;
transport_config.stored_frames =
std::min(kMaxUnackedFrames,
1 + static_cast<int>(max_playout_delay_ *
max_frame_rate_ /
base::TimeDelta::FromSeconds(1)));
transport_config.aes_key = audio_config.aes_key;
transport_config.aes_iv_mask = audio_config.aes_iv_mask;
......@@ -89,13 +84,10 @@ void AudioSender::InsertAudio(scoped_ptr<AudioBus> audio_bus,
}
DCHECK(audio_encoder_.get()) << "Invalid internal state";
// TODO(miu): An |audio_bus| that represents more duration than a single
// frame's duration can defeat our logic here, causing too much data to become
// enqueued. This will be addressed in a soon-upcoming change.
if (ShouldDropNextFrame(recorded_time)) {
VLOG(1) << "Dropping frame due to too many frames currently in-flight.";
const base::TimeDelta next_frame_duration =
RtpDeltaToTimeDelta(audio_bus->frames(), rtp_timebase());
if (ShouldDropNextFrame(next_frame_duration))
return;
}
samples_in_encoder_ += audio_bus->frames();
......@@ -108,6 +100,12 @@ int AudioSender::GetNumberOfFramesInEncoder() const {
return samples_in_encoder_ / audio_encoder_->GetSamplesPerFrame();
}
base::TimeDelta AudioSender::GetInFlightMediaDuration() const {
const int samples_in_flight = samples_in_encoder_ +
GetUnacknowledgedFrameCount() * audio_encoder_->GetSamplesPerFrame();
return RtpDeltaToTimeDelta(samples_in_flight, rtp_timebase());
}
void AudioSender::OnAck(uint32 frame_id) {
}
......
......@@ -52,6 +52,7 @@ class AudioSender : public FrameSender,
protected:
virtual int GetNumberOfFramesInEncoder() const OVERRIDE;
virtual base::TimeDelta GetInFlightMediaDuration() const OVERRIDE;
virtual void OnAck(uint32 frame_id) OVERRIDE;
private:
......
......@@ -13,8 +13,15 @@ namespace {
const int kMinSchedulingDelayMs = 1;
const int kNumAggressiveReportsSentAtStart = 100;
// The additional number of frames that can be in-flight when input exceeds the
// maximum frame rate.
const int kMaxFrameBurst = 5;
} // namespace
// Convenience macro used in logging statements throughout this file.
#define SENDER_SSRC (is_audio_ ? "AUDIO[" : "VIDEO[") << ssrc_ << "] "
FrameSender::FrameSender(scoped_refptr<CastEnvironment> cast_environment,
bool is_audio,
CastTransportSender* const transport_sender,
......@@ -37,8 +44,8 @@ FrameSender::FrameSender(scoped_refptr<CastEnvironment> cast_environment,
last_sent_frame_id_(0),
latest_acked_frame_id_(0),
duplicate_ack_counter_(0),
rtp_timebase_(rtp_timebase),
congestion_control_(congestion_control),
rtp_timebase_(rtp_timebase),
is_audio_(is_audio),
weak_factory_(this) {
DCHECK(transport_sender_);
......@@ -121,7 +128,8 @@ void FrameSender::ResendCheck() {
if (latest_acked_frame_id_ == last_sent_frame_id_) {
// Last frame acked, no point in doing anything
} else {
VLOG(1) << "ACK timeout; last acked frame: " << latest_acked_frame_id_;
VLOG(1) << SENDER_SSRC << "ACK timeout; last acked frame: "
<< latest_acked_frame_id_;
ResendForKickstart();
}
}
......@@ -146,8 +154,8 @@ void FrameSender::ScheduleNextResendCheck() {
void FrameSender::ResendForKickstart() {
DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
DCHECK(!last_send_time_.is_null());
VLOG(1) << "Resending last packet of frame " << last_sent_frame_id_
<< " to kick-start.";
VLOG(1) << SENDER_SSRC << "Resending last packet of frame "
<< last_sent_frame_id_ << " to kick-start.";
last_send_time_ = cast_environment_->Clock()->NowTicks();
transport_sender_->ResendFrameForKickstart(ssrc_, last_sent_frame_id_);
}
......@@ -170,11 +178,29 @@ RtpTimestamp FrameSender::GetRecordedRtpTimestamp(uint32 frame_id) const {
return frame_rtp_timestamps_[frame_id % arraysize(frame_rtp_timestamps_)];
}
int FrameSender::GetUnacknowledgedFrameCount() const {
const int count =
static_cast<int32>(last_sent_frame_id_ - latest_acked_frame_id_);
DCHECK_GE(count, 0);
return count;
}
base::TimeDelta FrameSender::GetAllowedInFlightMediaDuration() const {
// The total amount allowed in-flight media should equal the amount that fits
// within the entire playout delay window, plus the amount of time it takes to
// receive an ACK from the receiver.
// TODO(miu): Research is needed, but there is likely a better formula.
return target_playout_delay_ + (current_round_trip_time_ / 2);
}
void FrameSender::SendEncodedFrame(
int requested_bitrate_before_encode,
scoped_ptr<EncodedFrame> encoded_frame) {
DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
VLOG(2) << SENDER_SSRC << "About to send another frame: last_sent="
<< last_sent_frame_id_ << ", latest_acked=" << latest_acked_frame_id_;
const uint32 frame_id = encoded_frame->frame_id;
const bool is_first_frame_to_be_sent = last_send_time_.is_null();
......@@ -188,8 +214,8 @@ void FrameSender::SendEncodedFrame(
ScheduleNextResendCheck();
}
VLOG_IF(1, encoded_frame->dependency == EncodedFrame::KEY)
<< "Send encoded key frame; frame_id: " << frame_id;
VLOG_IF(1, !is_audio_ && encoded_frame->dependency == EncodedFrame::KEY)
<< SENDER_SSRC << "Sending encoded key frame, id=" << frame_id;
cast_environment_->Logging()->InsertEncodedFrameEvent(
last_send_time_, FRAME_ENCODED,
......@@ -222,7 +248,8 @@ void FrameSender::SendEncodedFrame(
++num_aggressive_rtcp_reports_sent_;
const bool is_last_aggressive_report =
(num_aggressive_rtcp_reports_sent_ == kNumAggressiveReportsSentAtStart);
VLOG_IF(1, is_last_aggressive_report) << "Sending last aggressive report.";
VLOG_IF(1, is_last_aggressive_report)
<< SENDER_SSRC << "Sending last aggressive report.";
SendRtcpReport(is_last_aggressive_report);
}
......@@ -247,7 +274,8 @@ void FrameSender::OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback) {
// based on it having received a report from here. Therefore, ensure this
// sender stops aggressively sending reports.
if (num_aggressive_rtcp_reports_sent_ < kNumAggressiveReportsSentAtStart) {
VLOG(1) << "No longer a need to send reports aggressively (sent "
VLOG(1) << SENDER_SSRC
<< "No longer a need to send reports aggressively (sent "
<< num_aggressive_rtcp_reports_sent_ << ").";
num_aggressive_rtcp_reports_sent_ = kNumAggressiveReportsSentAtStart;
ScheduleNextRtcpReport();
......@@ -269,7 +297,8 @@ void FrameSender::OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback) {
}
// TODO(miu): The values "2" and "3" should be derived from configuration.
if (duplicate_ack_counter_ >= 2 && duplicate_ack_counter_ % 3 == 2) {
VLOG(1) << "Received duplicate ACK for frame " << latest_acked_frame_id_;
VLOG(1) << SENDER_SSRC << "Received duplicate ACK for frame "
<< latest_acked_frame_id_;
ResendForKickstart();
}
} else {
......@@ -291,7 +320,8 @@ void FrameSender::OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback) {
const bool is_acked_out_of_order =
static_cast<int32>(cast_feedback.ack_frame_id -
latest_acked_frame_id_) < 0;
VLOG(2) << "Received ACK" << (is_acked_out_of_order ? " out-of-order" : "")
VLOG(2) << SENDER_SSRC
<< "Received ACK" << (is_acked_out_of_order ? " out-of-order" : "")
<< " for frame " << cast_feedback.ack_frame_id;
if (!is_acked_out_of_order) {
// Cancel resends of acked frames.
......@@ -305,31 +335,47 @@ void FrameSender::OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback) {
}
}
bool FrameSender::ShouldDropNextFrame(base::TimeTicks capture_time) const {
DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
int frames_in_flight = 0;
base::TimeDelta duration_in_flight;
if (!last_send_time_.is_null()) {
frames_in_flight =
static_cast<int32>(last_sent_frame_id_ - latest_acked_frame_id_);
if (frames_in_flight > 0) {
const uint32 oldest_unacked_frame_id = latest_acked_frame_id_ + 1;
duration_in_flight =
capture_time - GetRecordedReferenceTime(oldest_unacked_frame_id);
}
bool FrameSender::ShouldDropNextFrame(base::TimeDelta frame_duration) const {
// Check that accepting the next frame won't cause more frames to become
// in-flight than the system's design limit.
const int count_frames_in_flight =
GetUnacknowledgedFrameCount() + GetNumberOfFramesInEncoder();
if (count_frames_in_flight >= kMaxUnackedFrames) {
VLOG(1) << SENDER_SSRC << "Dropping: Too many frames would be in-flight.";
return true;
}
// Check that accepting the next frame won't exceed the configured maximum
// frame rate, allowing for short-term bursts.
base::TimeDelta duration_in_flight = GetInFlightMediaDuration();
const double max_frames_in_flight =
max_frame_rate_ * duration_in_flight.InSecondsF();
if (count_frames_in_flight >= max_frames_in_flight + kMaxFrameBurst) {
VLOG(1) << SENDER_SSRC << "Dropping: Burst threshold would be exceeded.";
return true;
}
frames_in_flight += GetNumberOfFramesInEncoder();
VLOG(2) << frames_in_flight
<< " frames in flight; last sent: " << last_sent_frame_id_
<< "; latest acked: " << latest_acked_frame_id_
<< "; frames in encoder: " << GetNumberOfFramesInEncoder()
<< "; duration in flight: "
<< duration_in_flight.InMicroseconds() << " usec ("
<< (target_playout_delay_ > base::TimeDelta() ?
100 * duration_in_flight / target_playout_delay_ :
kint64max) << "%)";
return frames_in_flight >= max_unacked_frames_ ||
duration_in_flight >= target_playout_delay_;
// Check that accepting the next frame won't exceed the allowed in-flight
// media duration.
const base::TimeDelta duration_would_be_in_flight =
duration_in_flight + frame_duration;
const base::TimeDelta allowed_in_flight = GetAllowedInFlightMediaDuration();
if (VLOG_IS_ON(1)) {
const int64 percent = allowed_in_flight > base::TimeDelta() ?
100 * duration_would_be_in_flight / allowed_in_flight : kint64max;
VLOG_IF(1, percent > 50)
<< SENDER_SSRC
<< duration_in_flight.InMicroseconds() << " usec in-flight + "
<< frame_duration.InMicroseconds() << " usec for next frame --> "
<< percent << "% of allowed in-flight.";
}
if (duration_would_be_in_flight > allowed_in_flight) {
VLOG(1) << SENDER_SSRC << "Dropping: In-flight duration would be too high.";
return true;
}
// Next frame is accepted.
return false;
}
} // namespace cast
......
......@@ -34,6 +34,8 @@ class FrameSender {
CongestionControl* congestion_control);
virtual ~FrameSender();
int rtp_timebase() const { return rtp_timebase_; }
// Calling this function is only valid if the receiver supports the
// "extra_playout_delay", rtp extension.
void SetTargetPlayoutDelay(base::TimeDelta new_target_playout_delay);
......@@ -50,6 +52,10 @@ class FrameSender {
// Returns the number of frames in the encoder's backlog.
virtual int GetNumberOfFramesInEncoder() const = 0;
// Returns the duration of the data in the encoder's backlog plus the duration
// of sent, unacknowledged frames.
virtual base::TimeDelta GetInFlightMediaDuration() const = 0;
// Called when we get an ACK for a frame.
virtual void OnAck(uint32 frame_id) = 0;
......@@ -84,11 +90,9 @@ class FrameSender {
// Protected for testability.
void OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback);
// Returns true if there are too many frames in flight, or if the media
// duration of the frames in flight would be too high by sending the next
// frame. The latter metric is determined from the given |capture_time|
// for the next frame to be encoded and sent.
bool ShouldDropNextFrame(base::TimeTicks capture_time) const;
// Returns true if too many frames would be in-flight by encoding and sending
// the next frame having the given |frame_duration|.
bool ShouldDropNextFrame(base::TimeDelta frame_duration) const;
// Record or retrieve a recent history of each frame's timestamps.
// Warning: If a frame ID too far in the past is requested, the getters will
......@@ -100,6 +104,9 @@ class FrameSender {
base::TimeTicks GetRecordedReferenceTime(uint32 frame_id) const;
RtpTimestamp GetRecordedRtpTimestamp(uint32 frame_id) const;
// Returns the number of frames that were sent but not yet acknowledged.
int GetUnacknowledgedFrameCount() const;
const base::TimeDelta rtcp_interval_;
// The total amount of time between a frame's capture/recording on the sender
......@@ -151,9 +158,6 @@ class FrameSender {
// STATUS_VIDEO_INITIALIZED.
CastInitializationStatus cast_initialization_status_;
// RTP timestamp increment representing one second.
const int rtp_timebase_;
// This object controls how we change the bitrate to make sure the
// buffer doesn't overflow.
scoped_ptr<CongestionControl> congestion_control_;
......@@ -162,6 +166,13 @@ class FrameSender {
base::TimeDelta current_round_trip_time_;
private:
// Returns the maximum media duration currently allowed in-flight. This
// fluctuates in response to the currently-measured network latency.
base::TimeDelta GetAllowedInFlightMediaDuration() const;
// RTP timestamp increment representing one second.
const int rtp_timebase_;
const bool is_audio_;
// Ring buffers to keep track of recent frame timestamps (both in terms of
......
......@@ -19,16 +19,21 @@
namespace media {
namespace cast {
namespace {
// The following two constants are used to adjust the target
// playout delay (when allowed). They were calculated using
// a combination of cast_benchmark runs and manual testing.
//
// This is how many round trips we think we need on the network.
const int kRoundTripsNeeded = 4;
// This is an estimate of all the the constant time needed
// independent of network quality.
// This is an estimate of all the the constant time needed independent of
// network quality (e.g., additional time that accounts for encode and decode
// time).
const int kConstantTimeMs = 75;
} // namespace
// Note, we use a fixed bitrate value when external video encoder is used.
// Some hardware encoder shows bad behavior if we set the bitrate too
// frequently, e.g. quality drop, not abiding by target bitrate, etc.
......@@ -95,11 +100,6 @@ VideoSender::VideoSender(
transport_config.ssrc = video_config.ssrc;
transport_config.feedback_ssrc = video_config.incoming_feedback_ssrc;
transport_config.rtp_payload_type = video_config.rtp_payload_type;
transport_config.stored_frames =
std::min(kMaxUnackedFrames,
1 + static_cast<int>(max_playout_delay_ *
max_frame_rate_ /
base::TimeDelta::FromSeconds(1)));
transport_config.aes_key = video_config.aes_key;
transport_config.aes_iv_mask = video_config.aes_iv_mask;
......@@ -141,8 +141,26 @@ void VideoSender::InsertRawVideoFrame(
"timestamp", capture_time.ToInternalValue(),
"rtp_timestamp", rtp_timestamp);
if (ShouldDropNextFrame(capture_time)) {
VLOG(1) << "Dropping frame due to too many frames currently in-flight.";
// Drop the frame if its reference timestamp is not an increase over the last
// frame's. This protects: 1) the duration calculations that assume
// timestamps are monotonically non-decreasing, and 2) assumptions made deeper
// in the implementation where each frame's RTP timestamp needs to be unique.
if (!last_enqueued_frame_reference_time_.is_null() &&
capture_time <= last_enqueued_frame_reference_time_) {
VLOG(1) << "Dropping video frame: Reference time did not increase.";
return;
}
// Two video frames are needed to compute the exact media duration added by
// the next frame. If there are no frames in the encoder, compute a guess
// based on the configured |max_frame_rate_|. Any error introduced by this
// guess will be eliminated when |duration_in_encoder_| is updated in
// OnEncodedVideoFrame().
const base::TimeDelta duration_added_by_next_frame = frames_in_encoder_ > 0 ?
capture_time - last_enqueued_frame_reference_time_ :
base::TimeDelta::FromSecondsD(1.0 / max_frame_rate_);
if (ShouldDropNextFrame(duration_added_by_next_frame)) {
base::TimeDelta new_target_delay = std::min(
current_round_trip_time_ * kRoundTripsNeeded +
base::TimeDelta::FromMilliseconds(kConstantTimeMs),
......@@ -168,6 +186,8 @@ void VideoSender::InsertRawVideoFrame(
weak_factory_.GetWeakPtr(),
bitrate))) {
frames_in_encoder_++;
duration_in_encoder_ += duration_added_by_next_frame;
last_enqueued_frame_reference_time_ = capture_time;
} else {
VLOG(1) << "Encoder rejected a frame. Skipping...";
}
......@@ -177,6 +197,16 @@ int VideoSender::GetNumberOfFramesInEncoder() const {
return frames_in_encoder_;
}
base::TimeDelta VideoSender::GetInFlightMediaDuration() const {
if (GetUnacknowledgedFrameCount() > 0) {
const uint32 oldest_unacked_frame_id = latest_acked_frame_id_ + 1;
return last_enqueued_frame_reference_time_ -
GetRecordedReferenceTime(oldest_unacked_frame_id);
} else {
return duration_in_encoder_;
}
}
void VideoSender::OnAck(uint32 frame_id) {
video_encoder_->LatestFrameIdToReference(frame_id);
}
......@@ -196,6 +226,9 @@ void VideoSender::OnEncodedVideoFrame(
frames_in_encoder_--;
DCHECK_GE(frames_in_encoder_, 0);
duration_in_encoder_ =
last_enqueued_frame_reference_time_ - encoded_frame->reference_time;
SendEncodedFrame(encoder_bitrate, encoded_frame.Pass());
}
......
......@@ -58,6 +58,7 @@ class VideoSender : public FrameSender,
protected:
virtual int GetNumberOfFramesInEncoder() const OVERRIDE;
virtual base::TimeDelta GetInFlightMediaDuration() const OVERRIDE;
virtual void OnAck(uint32 frame_id) OVERRIDE;
private:
......@@ -78,6 +79,12 @@ class VideoSender : public FrameSender,
// The number of frames queued for encoding, but not yet sent.
int frames_in_encoder_;
// The duration of video queued for encoding, but not yet sent.
base::TimeDelta duration_in_encoder_;
// The timestamp of the frame that was last enqueued in |video_encoder_|.
base::TimeTicks last_enqueued_frame_reference_time_;
// Remember what we set the bitrate to before, no need to set it again if
// we get the same value.
uint32 last_bitrate_;
......
......@@ -1398,6 +1398,32 @@ TEST_F(End2EndTest, EvilNetwork) {
base::TimeTicks test_end = testing_clock_receiver_->NowTicks();
RunTasks(100 * kFrameTimerMs + 1); // Empty the pipeline.
EXPECT_GT(video_ticks_.size(), 100ul);
VLOG(1) << "Fully transmitted " << video_ticks_.size()
<< " out of 10000 frames.";
EXPECT_LT((video_ticks_.back().second - test_end).InMilliseconds(), 1000);
}
// Tests that a system configured for 30 FPS drops frames when input is provided
// at a much higher frame rate.
TEST_F(End2EndTest, ShoveHighFrameRateDownYerThroat) {
Configure(CODEC_VIDEO_FAKE, CODEC_AUDIO_PCM16, 32000,
1);
receiver_to_sender_.SetPacketPipe(test::EvilNetwork().Pass());
sender_to_receiver_.SetPacketPipe(test::EvilNetwork().Pass());
Create();
StartBasicPlayer();
int frames_counter = 0;
for (; frames_counter < 10000; ++frames_counter) {
SendFakeVideoFrame(testing_clock_sender_->NowTicks());
RunTasks(10 /* 10 ms, but 33.3 expected by system */);
}
base::TimeTicks test_end = testing_clock_receiver_->NowTicks();
RunTasks(100 * kFrameTimerMs + 1); // Empty the pipeline.
EXPECT_LT(100ul, video_ticks_.size());
EXPECT_GE(3334ul, video_ticks_.size());
VLOG(1) << "Fully transmitted " << video_ticks_.size()
<< " out of 10000 frames.";
EXPECT_LT((video_ticks_.back().second - test_end).InMilliseconds(), 1000);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment