Commit 439465f8 authored by Fredrik Hubinette's avatar Fredrik Hubinette Committed by Commit Bot

Delay and batch seeks to make them smarter

Previously we would call SeekTask for every
completed read. This often cause us to seek
to somewhere and change our mind a very
short time later. By delaying our seeks and
batching them together, we can reduce the
number of times this happens.

I've tested this on a a bunch of badly muxed
videos, and generally got improvements. For
my primary test video, the number of connections
went from ~110 to ~70. As far as I can tell,
this has no impact on well-muxed videos.

Bug: 765313
Change-Id: I1cfdd803e9cda8fd45fae30ec2c0343f08a79edd
Reviewed-on: https://chromium-review.googlesource.com/690978
Commit-Queue: Fredrik Hubinette <hubbe@chromium.org>
Reviewed-by: default avatarDale Curtis <dalecurtis@chromium.org>
Reviewed-by: default avatarFrank Liberato <liberato@chromium.org>
Cr-Commit-Position: refs/heads/master@{#506667}
parent 5501e5ad
...@@ -53,6 +53,9 @@ const int kSlowPreloadPercentage = 10; ...@@ -53,6 +53,9 @@ const int kSlowPreloadPercentage = 10;
// Update buffer sizes every 32 progress updates. // Update buffer sizes every 32 progress updates.
const int kUpdateBufferSizeFrequency = 32; const int kUpdateBufferSizeFrequency = 32;
// How long to we delay a seek after a read?
constexpr base::TimeDelta kSeekDelay = base::TimeDelta::FromMilliseconds(20);
} // namespace } // namespace
namespace media { namespace media {
...@@ -358,10 +361,16 @@ void MultibufferDataSource::Read(int64_t position, ...@@ -358,10 +361,16 @@ void MultibufferDataSource::Read(int64_t position,
if (reader_) { if (reader_) {
int bytes_read = reader_->TryReadAt(position, data, size); int bytes_read = reader_->TryReadAt(position, data, size);
if (bytes_read > 0) { if (bytes_read > 0) {
render_task_runner_->PostTask( bytes_read_ += bytes_read;
FROM_HERE, seek_positions_.push_back(position + bytes_read);
base::Bind(&MultibufferDataSource::SeekTask, if (seek_positions_.size() == 1) {
weak_factory_.GetWeakPtr(), position, bytes_read)); render_task_runner_->PostDelayedTask(
FROM_HERE,
base::Bind(&MultibufferDataSource::SeekTask,
weak_factory_.GetWeakPtr()),
kSeekDelay);
}
read_cb.Run(bytes_read); read_cb.Run(bytes_read);
return; return;
} }
...@@ -413,50 +422,81 @@ void MultibufferDataSource::ReadTask() { ...@@ -413,50 +422,81 @@ void MultibufferDataSource::ReadTask() {
static_cast<int>(std::min<int64_t>(available, read_op_->size())); static_cast<int>(std::min<int64_t>(available, read_op_->size()));
bytes_read = bytes_read =
reader_->TryReadAt(read_op_->position(), read_op_->data(), bytes_read); reader_->TryReadAt(read_op_->position(), read_op_->data(), bytes_read);
url_data_->AddBytesRead(bytes_read);
int64_t new_pos = read_op_->position() + bytes_read; bytes_read_ += bytes_read;
// If we're seeking to a new location, (not just slightly further seek_positions_.push_back(read_op_->position() + bytes_read);
// in the file) and we have more data buffered in that new location
// than in our current location, then we don't actually seek anywhere.
// Instead we keep preloading at the old location a while longer.
if (reader_->AvailableAt(new_pos) <= reader_->Available())
reader_->Seek(new_pos);
if (bytes_read == 0 && total_bytes_ == kPositionNotSpecified) { if (bytes_read == 0 && total_bytes_ == kPositionNotSpecified) {
// We've reached the end of the file and we didn't know the total size // We've reached the end of the file and we didn't know the total size
// before. Update the total size so Read()s past the end of the file will // before. Update the total size so Read()s past the end of the file will
// fail like they would if we had known the file size at the beginning. // fail like they would if we had known the file size at the beginning.
total_bytes_ = reader_->Tell(); total_bytes_ = read_op_->position() + bytes_read;
if (total_bytes_ != kPositionNotSpecified) if (total_bytes_ != kPositionNotSpecified)
host_->SetTotalBytes(total_bytes_); host_->SetTotalBytes(total_bytes_);
} }
ReadOperation::Run(std::move(read_op_), bytes_read); ReadOperation::Run(std::move(read_op_), bytes_read);
SeekTask_Locked();
} else { } else {
reader_->Seek(read_op_->position()); reader_->Seek(read_op_->position());
reader_->Wait(1, base::Bind(&MultibufferDataSource::ReadTask, reader_->Wait(1, base::Bind(&MultibufferDataSource::ReadTask,
weak_factory_.GetWeakPtr())); weak_factory_.GetWeakPtr()));
UpdateLoadingState_Locked(false);
} }
UpdateLoadingState_Locked(false);
} }
void MultibufferDataSource::SeekTask(int64_t pos, int bytes_read) { void MultibufferDataSource::SeekTask() {
DCHECK(render_task_runner_->BelongsToCurrentThread()); DCHECK(render_task_runner_->BelongsToCurrentThread());
DCHECK_GT(bytes_read, 0);
base::AutoLock auto_lock(lock_); base::AutoLock auto_lock(lock_);
SeekTask_Locked();
}
void MultibufferDataSource::SeekTask_Locked() {
DCHECK(render_task_runner_->BelongsToCurrentThread());
lock_.AssertAcquired();
if (stop_signal_received_) if (stop_signal_received_)
return; return;
url_data_->AddBytesRead(bytes_read); // A read operation is pending, which will call SeekTask_Locked when
// it's done. We'll defer any seeking until the read op is done.
if (read_op_)
return;
url_data_->AddBytesRead(bytes_read_);
bytes_read_ = 0;
int64_t new_pos = pos + bytes_read; if (reader_) {
// If we're seeking to a new location, (not just slightly further // If we're seeking to a new location, (not just slightly further
// in the file) and we have more data buffered in that new location // in the file) and we have more data buffered in that new location
// than in our current location, then we don't actually seek anywhere. // than in our current location, then we don't actually seek anywhere.
// Instead we keep preloading at the old location a while longer. // Instead we keep preloading at the old location a while longer.
if (reader_ && reader_->AvailableAt(new_pos) <= reader_->Available())
reader_->Seek(new_pos); int64_t pos = reader_->Tell();
int64_t available = reader_->Available();
// Iterate backwards, because if two positions have the same
// amount of buffered data, we probably want to prefer the latest
// one in the array.
for (auto i = seek_positions_.rbegin(); i != seek_positions_.rend(); ++i) {
int64_t new_pos = *i;
int64_t available_at_new_pos = reader_->AvailableAt(new_pos);
if (total_bytes_ != kPositionNotSpecified) {
if (new_pos + available_at_new_pos >= total_bytes_) {
// Buffer reaches end of file, no need to seek here.
continue;
}
}
if (available_at_new_pos < available) {
pos = new_pos;
available = available_at_new_pos;
}
}
reader_->Seek(pos);
}
seek_positions_.clear();
UpdateLoadingState_Locked(false); UpdateLoadingState_Locked(false);
} }
......
...@@ -128,9 +128,14 @@ class MEDIA_BLINK_EXPORT MultibufferDataSource : public DataSource { ...@@ -128,9 +128,14 @@ class MEDIA_BLINK_EXPORT MultibufferDataSource : public DataSource {
// Task posted to perform actual reading on the render thread. // Task posted to perform actual reading on the render thread.
void ReadTask(); void ReadTask();
// When reading happens in read(), this does the parts which has to happen // After a read, this function updates the read position.
// on the renderer thread. // It's in a separate function because the read itself can either happen
void SeekTask(int64_t pos, int bytes_read); // in ReadTask() or in Read(), both of which call this function afterwards.
void SeekTask_Locked();
// Lock |lock_| lock and call SeekTask_Locked().
// Called with PostTask when read() complets on the demuxer thread.
void SeekTask();
// Cancels oustanding callbacks and sets |stop_signal_received_|. Safe to call // Cancels oustanding callbacks and sets |stop_signal_received_|. Safe to call
// from any thread. // from any thread.
...@@ -168,6 +173,15 @@ class MEDIA_BLINK_EXPORT MultibufferDataSource : public DataSource { ...@@ -168,6 +173,15 @@ class MEDIA_BLINK_EXPORT MultibufferDataSource : public DataSource {
// determined by reaching EOF. // determined by reaching EOF.
int64_t total_bytes_; int64_t total_bytes_;
// Bytes we've read but not reported to the url_data yet.
// SeekTask handles the reporting.
int64_t bytes_read_ = 0;
// Places we might want to seek to. After each read we add another
// location here, and when SeekTask() is called, it picks the best
// position and then clears it out.
std::vector<int64_t> seek_positions_;
// This value will be true if this data source can only support streaming. // This value will be true if this data source can only support streaming.
// i.e. range request is not supported. // i.e. range request is not supported.
bool streaming_; bool streaming_;
......
...@@ -186,6 +186,7 @@ class MockMultibufferDataSource : public MultibufferDataSource { ...@@ -186,6 +186,7 @@ class MockMultibufferDataSource : public MultibufferDataSource {
bool downloading() { return downloading_; } bool downloading() { return downloading_; }
void set_downloading(bool downloading) { downloading_ = downloading; } void set_downloading(bool downloading) { downloading_ = downloading; }
bool range_supported() { return url_data_->range_supported(); } bool range_supported() { return url_data_->range_supported(); }
void CallSeekTask() { SeekTask(); }
private: private:
// Whether the resource is downloading or deferred. // Whether the resource is downloading or deferred.
...@@ -1648,9 +1649,50 @@ TEST_F(MultibufferDataSourceTest, Http_Seek_Back) { ...@@ -1648,9 +1649,50 @@ TEST_F(MultibufferDataSourceTest, Http_Seek_Back) {
// more data buffered at this location than at kFarReadPosition. // more data buffered at this location than at kFarReadPosition.
EXPECT_CALL(*this, ReadCallback(kDataSize)); EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(0); ReadAt(0);
data_source_->CallSeekTask();
EXPECT_EQ(kFarReadPosition + kDataSize, loader()->Tell());
// Again, no seek.
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(0);
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(kDataSize);
data_source_->CallSeekTask();
EXPECT_EQ(kFarReadPosition + kDataSize, loader()->Tell()); EXPECT_EQ(kFarReadPosition + kDataSize, loader()->Tell());
// Still no seek
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(kFarReadPosition);
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(0);
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(kDataSize);
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(kDataSize * 2);
data_source_->CallSeekTask();
EXPECT_EQ(kFarReadPosition + kDataSize, loader()->Tell());
// Read some data from far ahead, but right before where we read before.
// This time we'll have one block buffered.
ReadAt(kFarReadPosition - kDataSize);
EXPECT_CALL(*this, ReadCallback(kDataSize));
EXPECT_CALL(host_, AddBufferedByteRange(kFarReadPosition - kDataSize,
kFarReadPosition + kDataSize));
Respond(response_generator_->Generate206(kFarReadPosition - kDataSize));
ReceiveData(kDataSize);
// No Seek
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(0);
data_source_->CallSeekTask();
EXPECT_EQ(kFarReadPosition, loader()->Tell());
// Seek
EXPECT_CALL(*this, ReadCallback(kDataSize));
ReadAt(kDataSize * 2);
data_source_->CallSeekTask();
EXPECT_EQ(kDataSize * 3, loader()->Tell());
Stop(); Stop();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment