Commit 71e4fcee authored by Guido Urdaneta's avatar Guido Urdaneta Committed by Chromium LUCI CQ

[BreakoutBox] Update queue management in MediaStreamTrackProcessor

This CL updates queue management for MSTP as follows:

1. The main queue management now occurs in the underlying source and
not in the stream controller. A frame is enqueued in the controller
when pull() is invoked, or a when a new frame becomes available after
a previous pull with an empty queue.

2. When the queue is full and more frames arrive (i.e., in a situation
of slow consumption), old frames are dropped. Before this CL, incoming
frames were dropped when the controller queue desired size exceeded a
threshold.

3. The queue size is now configurable from the JS side (via a
constructor parameter). This makes it possible to support more use
cases than with a fixed queue. The new parameter will also be useful
for audio when it becomes available.

Fixed: 1157605, 1157604
Change-Id: I0260f7d5fe0495c33cfc022ce534ef04c518e90b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2587034
Commit-Queue: Guido Urdaneta <guidou@chromium.org>
Reviewed-by: default avatarThomas Guilbert <tguilbert@chromium.org>
Cr-Commit-Position: refs/heads/master@{#836229}
parent b7336d90
......@@ -19,8 +19,9 @@ namespace blink {
MediaStreamTrackProcessor::MediaStreamTrackProcessor(
ScriptState* script_state,
MediaStreamComponent* input_track)
: input_track_(input_track) {
MediaStreamComponent* input_track,
uint16_t buffer_size)
: input_track_(input_track), buffer_size_(buffer_size) {
DCHECK(input_track_);
UseCounter::Count(ExecutionContext::From(script_state),
WebFeature::kMediaStreamTrackProcessor);
......@@ -37,8 +38,8 @@ void MediaStreamTrackProcessor::CreateVideoSourceStream(
ScriptState* script_state) {
DCHECK(!source_stream_);
video_underlying_source_ =
MakeGarbageCollected<MediaStreamVideoTrackUnderlyingSource>(script_state,
input_track_);
MakeGarbageCollected<MediaStreamVideoTrackUnderlyingSource>(
script_state, input_track_, buffer_size_);
source_stream_ = ReadableStream::CreateWithCountQueueingStrategy(
script_state, video_underlying_source_, /*high_water_mark=*/0);
}
......@@ -46,6 +47,7 @@ void MediaStreamTrackProcessor::CreateVideoSourceStream(
MediaStreamTrackProcessor* MediaStreamTrackProcessor::Create(
ScriptState* script_state,
MediaStreamTrack* track,
uint16_t buffer_size,
ExceptionState& exception_state) {
if (!track) {
exception_state.ThrowDOMException(DOMExceptionCode::kOperationError,
......@@ -53,6 +55,12 @@ MediaStreamTrackProcessor* MediaStreamTrackProcessor::Create(
return nullptr;
}
if (track->readyState() == "ended") {
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"Input track cannot be ended");
return nullptr;
}
if (track->kind() != "video") {
exception_state.ThrowDOMException(DOMExceptionCode::kNotSupportedError,
"Only video tracks are supported");
......@@ -60,14 +68,32 @@ MediaStreamTrackProcessor* MediaStreamTrackProcessor::Create(
}
if (!script_state->ContextIsValid()) {
exception_state.ThrowDOMException(DOMExceptionCode::kNotSupportedError,
exception_state.ThrowDOMException(DOMExceptionCode::kInvalidStateError,
"The context has been destroyed");
return nullptr;
}
return MakeGarbageCollected<MediaStreamTrackProcessor>(script_state,
track->Component());
return MakeGarbageCollected<MediaStreamTrackProcessor>(
script_state, track->Component(), buffer_size);
}
MediaStreamTrackProcessor* MediaStreamTrackProcessor::Create(
ScriptState* script_state,
MediaStreamTrack* track,
ExceptionState& exception_state) {
if (!track) {
exception_state.ThrowDOMException(DOMExceptionCode::kOperationError,
"Input track cannot be null");
return nullptr;
}
// Using 1 as default buffer size for video since by default we do not want
// to buffer, as buffering interferes with MediaStream sources that drop
// frames if they start to be buffered (e.g, camera sources).
// Using 10 as default for audio, which coincides with the buffer size for
// the Web Audio MediaStream sink.
uint16_t buffer_size = track->kind() == "video" ? 1u : 10u;
return Create(script_state, track, buffer_size, exception_state);
}
void MediaStreamTrackProcessor::Trace(Visitor* visitor) const {
......
......@@ -23,8 +23,14 @@ class MODULES_EXPORT MediaStreamTrackProcessor : public ScriptWrappable {
public:
static MediaStreamTrackProcessor* Create(ScriptState*,
MediaStreamTrack*,
uint16_t buffer_size,
ExceptionState&);
MediaStreamTrackProcessor(ScriptState*, MediaStreamComponent*);
static MediaStreamTrackProcessor* Create(ScriptState*,
MediaStreamTrack*,
ExceptionState&);
MediaStreamTrackProcessor(ScriptState*,
MediaStreamComponent*,
uint16_t buffer_size);
MediaStreamTrackProcessor(const MediaStreamTrackProcessor&) = delete;
MediaStreamTrackProcessor& operator=(const MediaStreamTrackProcessor&) =
delete;
......@@ -42,6 +48,7 @@ class MODULES_EXPORT MediaStreamTrackProcessor : public ScriptWrappable {
Member<MediaStreamComponent> input_track_;
Member<MediaStreamVideoTrackUnderlyingSource> video_underlying_source_;
Member<ReadableStream> source_stream_;
uint16_t buffer_size_;
};
} // namespace blink
......
......@@ -10,7 +10,7 @@
]
interface MediaStreamTrackProcessor {
[CallWith=ScriptState, RaisesException, MeasureAs=MediaStreamTrackProcessor]
constructor(MediaStreamTrack track);
constructor(MediaStreamTrack track, optional unsigned short bufferSize);
// This stream returns VideoFrame objects.
// TODO(crbug.com/1142955): Add support for audio.
......
......@@ -237,6 +237,24 @@ TEST_F(MediaStreamTrackProcessorTest, NullInputTrack) {
DOMExceptionCode::kOperationError);
}
TEST_F(MediaStreamTrackProcessorTest, EndedTrack) {
V8TestingScope v8_scope;
ScriptState* script_state = v8_scope.GetScriptState();
ExceptionState& exception_state = v8_scope.GetExceptionState();
PushableMediaStreamVideoSource* pushable_video_source =
CreatePushableVideoSource();
MediaStreamTrack* track = CreateVideoMediaStreamTrack(
v8_scope.GetExecutionContext(), pushable_video_source);
track->stopTrack(v8_scope.GetExecutionContext());
MediaStreamTrackProcessor* track_processor =
MediaStreamTrackProcessor::Create(script_state, track, exception_state);
EXPECT_EQ(track_processor, nullptr);
EXPECT_TRUE(exception_state.HadException());
EXPECT_EQ(static_cast<DOMExceptionCode>(v8_scope.GetExceptionState().Code()),
DOMExceptionCode::kInvalidStateError);
}
// TODO(crbug.com/1142955): Add support for audio.
TEST_F(MediaStreamTrackProcessorTest, Audio) {
V8TestingScope v8_scope;
......
......@@ -21,18 +21,26 @@ namespace blink {
MediaStreamVideoTrackUnderlyingSource::MediaStreamVideoTrackUnderlyingSource(
ScriptState* script_state,
MediaStreamComponent* track)
MediaStreamComponent* track,
wtf_size_t max_queue_size)
: UnderlyingSourceBase(script_state),
main_task_runner_(ExecutionContext::From(script_state)
->GetTaskRunner(TaskType::kInternalMediaRealTime)),
track_(track) {
track_(track),
max_queue_size_(std::max(1u, max_queue_size)) {
DCHECK(track_);
}
ScriptPromise MediaStreamVideoTrackUnderlyingSource::pull(
ScriptState* script_state) {
DCHECK(main_task_runner_->BelongsToCurrentThread());
// No backpressure support, so nothing to do here.
if (!queue_.empty()) {
ProcessPullRequest();
} else {
is_pending_pull_ = true;
}
DCHECK_LT(queue_.size(), max_queue_size_);
return ScriptPromise::CastUndefined(script_state);
}
......@@ -68,12 +76,16 @@ void MediaStreamVideoTrackUnderlyingSource::Trace(Visitor* visitor) const {
UnderlyingSourceBase::Trace(visitor);
}
double MediaStreamVideoTrackUnderlyingSource::DesiredSizeForTesting() const {
return Controller()->DesiredSize();
}
void MediaStreamVideoTrackUnderlyingSource::Close() {
DCHECK(main_task_runner_->BelongsToCurrentThread());
DisconnectFromTrack();
if (Controller())
Controller()->Close();
queue_.clear();
}
void MediaStreamVideoTrackUnderlyingSource::OnFrameFromTrack(
......@@ -91,15 +103,38 @@ void MediaStreamVideoTrackUnderlyingSource::OnFrameFromTrackOnMainThread(
scoped_refptr<media::VideoFrame> media_frame,
base::TimeTicks /*estimated_capture_time*/) {
DCHECK(main_task_runner_->BelongsToCurrentThread());
// Drop the frame if there is already a queued frame in the controller.
// Queueing even a small number of frames can result in significant
// performance issues, so do not allow queueing more than one frame.
if (!Controller() || Controller()->DesiredSize() < 0)
DCHECK_LE(queue_.size(), max_queue_size_);
// If the |queue_| is empty and the consumer has signaled a pull, bypass
// |queue_| and send the frame directly to the stream controller.
if (queue_.empty() && is_pending_pull_) {
SendFrameToStream(std::move(media_frame));
return;
}
if (queue_.size() == max_queue_size_)
queue_.pop_front();
queue_.push_back(std::move(media_frame));
if (is_pending_pull_) {
ProcessPullRequest();
}
}
void MediaStreamVideoTrackUnderlyingSource::ProcessPullRequest() {
DCHECK(!queue_.empty());
SendFrameToStream(std::move(queue_.front()));
queue_.pop_front();
}
void MediaStreamVideoTrackUnderlyingSource::SendFrameToStream(
scoped_refptr<media::VideoFrame> media_frame) {
DCHECK(media_frame);
DCHECK(Controller());
VideoFrame* video_frame = MakeGarbageCollected<VideoFrame>(
std::move(media_frame), GetExecutionContext());
Controller()->Enqueue(video_frame);
is_pending_pull_ = false;
}
} // namespace blink
......@@ -9,6 +9,7 @@
#include "third_party/blink/public/web/modules/mediastream/media_stream_video_sink.h"
#include "third_party/blink/renderer/core/streams/underlying_source_base.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/wtf/deque.h"
namespace blink {
......@@ -19,7 +20,8 @@ class MODULES_EXPORT MediaStreamVideoTrackUnderlyingSource
public MediaStreamVideoSink {
public:
explicit MediaStreamVideoTrackUnderlyingSource(ScriptState*,
MediaStreamComponent*);
MediaStreamComponent*,
wtf_size_t queue_size);
MediaStreamVideoTrackUnderlyingSource(
const MediaStreamVideoTrackUnderlyingSource&) = delete;
MediaStreamVideoTrackUnderlyingSource& operator=(
......@@ -30,21 +32,36 @@ class MODULES_EXPORT MediaStreamVideoTrackUnderlyingSource
ScriptPromise Start(ScriptState*) override;
ScriptPromise Cancel(ScriptState*, ScriptValue reason) override;
MediaStreamComponent* Track() const { return track_.Get(); }
wtf_size_t MaxQueueSize() const { return max_queue_size_; }
bool IsPendingPullForTesting() const { return is_pending_pull_; }
const Deque<scoped_refptr<media::VideoFrame>>& QueueForTesting() const {
return queue_;
}
double DesiredSizeForTesting() const;
void Close();
void Trace(Visitor*) const override;
private:
FRIEND_TEST_ALL_PREFIXES(MediaStreamVideoTrackUnderlyingSourceTest,
FramesAreDropped);
void OnFrameFromTrack(scoped_refptr<media::VideoFrame> media_frame,
base::TimeTicks estimated_capture_time);
void OnFrameFromTrackOnMainThread(
scoped_refptr<media::VideoFrame> media_frame,
base::TimeTicks estimated_capture_time);
void SendFrameToStream(scoped_refptr<media::VideoFrame> media_frame);
void ProcessPullRequest();
const scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
const Member<MediaStreamComponent> track_;
// An internal deque prior to the stream controller's queue. It acts as a ring
// buffer and allows dropping old frames instead of new ones in case frames
// accumulate due to slow consumption.
Deque<scoped_refptr<media::VideoFrame>> queue_;
const wtf_size_t max_queue_size_;
bool is_pending_pull_ = false;
};
} // namespace blink
......
......@@ -4,6 +4,8 @@
#include "third_party/blink/renderer/modules/mediastream/media_stream_video_track_underlying_source.h"
#include "base/run_loop.h"
#include "base/test/gmock_callback_support.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/blink/public/platform/modules/mediastream/web_media_stream_track.h"
#include "third_party/blink/public/web/web_heap.h"
......@@ -14,11 +16,14 @@
#include "third_party/blink/renderer/core/streams/readable_stream_default_controller_with_script_scope.h"
#include "third_party/blink/renderer/modules/mediastream/media_stream_track.h"
#include "third_party/blink/renderer/modules/mediastream/media_stream_video_track.h"
#include "third_party/blink/renderer/modules/mediastream/mock_media_stream_video_sink.h"
#include "third_party/blink/renderer/modules/mediastream/pushable_media_stream_video_source.h"
#include "third_party/blink/renderer/platform/bindings/exception_state.h"
#include "third_party/blink/renderer/platform/testing/io_task_runner_testing_platform_support.h"
#include "third_party/blink/renderer/platform/testing/testing_platform_support.h"
using testing::_;
namespace blink {
class MediaStreamVideoTrackUnderlyingSourceTest : public testing::Test {
......@@ -51,23 +56,37 @@ class MediaStreamVideoTrackUnderlyingSourceTest : public testing::Test {
MediaStreamVideoTrackUnderlyingSource* CreateSource(
ScriptState* script_state,
MediaStreamComponent* track) {
MediaStreamComponent* track,
wtf_size_t buffer_size) {
return MakeGarbageCollected<MediaStreamVideoTrackUnderlyingSource>(
script_state, track);
script_state, track, buffer_size);
}
MediaStreamVideoTrackUnderlyingSource* CreateSource(
ScriptState* script_state) {
ScriptState* script_state,
MediaStreamComponent* track) {
return CreateSource(script_state, track, 1u);
}
MediaStreamVideoTrackUnderlyingSource* CreateSource(ScriptState* script_state,
wtf_size_t buffer_size) {
MediaStreamComponent* track =
CreateTrack(ExecutionContext::From(script_state));
return MakeGarbageCollected<MediaStreamVideoTrackUnderlyingSource>(
script_state, track);
return CreateSource(script_state, track, buffer_size);
}
MediaStreamVideoTrackUnderlyingSource* CreateSource(
ScriptState* script_state) {
return CreateSource(script_state, 1u);
}
protected:
void PushFrame() {
void PushFrame(
const base::Optional<base::TimeDelta>& timestamp = base::nullopt) {
const scoped_refptr<media::VideoFrame> frame =
media::VideoFrame::CreateBlackFrame(gfx::Size(10, 5));
if (timestamp)
frame->set_timestamp(*timestamp);
pushable_video_source_->PushFrame(frame, base::TimeTicks());
platform_->RunUntilIdle();
}
......@@ -121,27 +140,106 @@ TEST_F(MediaStreamVideoTrackUnderlyingSourceTest,
EXPECT_EQ(video_track->CountSinks(), 0u);
}
// crbug.com/1153092: flaky on several platforms.
TEST_F(MediaStreamVideoTrackUnderlyingSourceTest, DISABLED_FramesAreDropped) {
TEST_F(MediaStreamVideoTrackUnderlyingSourceTest,
DropOldFramesWhenQueueIsFull) {
V8TestingScope v8_scope;
ScriptState* script_state = v8_scope.GetScriptState();
auto* source = CreateSource(script_state);
// Create a stream, to ensure there is a controller associated to the source.
ReadableStream::CreateWithCountQueueingStrategy(v8_scope.GetScriptState(),
source, 0);
// The controller initially has no frames.
EXPECT_EQ(source->Controller()->DesiredSize(), 0);
const wtf_size_t buffer_size = 5;
auto* source = CreateSource(script_state, buffer_size);
EXPECT_EQ(source->MaxQueueSize(), buffer_size);
// Create a stream to ensure there is a controller associated to the source.
ReadableStream::CreateWithCountQueueingStrategy(script_state, source, 0);
// Add a sink to the track to make it possible to wait until a pushed frame
// is delivered to sinks, including |source|, which is a sink of the track.
MockMediaStreamVideoSink mock_sink;
mock_sink.ConnectToTrack(WebMediaStreamTrack(source->Track()));
auto push_frame_sync = [&mock_sink, this](const base::TimeDelta timestamp) {
base::RunLoop sink_loop;
EXPECT_CALL(mock_sink, OnVideoFrame(_))
.WillOnce(base::test::RunOnceClosure(sink_loop.QuitClosure()));
PushFrame(timestamp);
sink_loop.Run();
};
const auto& queue = source->QueueForTesting();
for (wtf_size_t i = 0; i < buffer_size; ++i) {
EXPECT_EQ(queue.size(), i);
base::TimeDelta timestamp = base::TimeDelta::FromSeconds(i);
push_frame_sync(timestamp);
EXPECT_EQ(queue.back()->timestamp(), timestamp);
EXPECT_EQ(queue.front()->timestamp(), base::TimeDelta::FromSeconds(0));
}
// Push a frame. DesiredSize() decreases since the frame is not consumed.
PushFrame();
EXPECT_EQ(source->Controller()->DesiredSize(), -1);
// Push another frame while the queue is full.
EXPECT_EQ(queue.size(), buffer_size);
push_frame_sync(base::TimeDelta::FromSeconds(buffer_size));
// Since the queue was full, the oldest frame from the queue should have been
// dropped.
EXPECT_EQ(queue.size(), buffer_size);
EXPECT_EQ(queue.back()->timestamp(),
base::TimeDelta::FromSeconds(buffer_size));
EXPECT_EQ(queue.front()->timestamp(), base::TimeDelta::FromSeconds(1));
// Pulling with frames in the queue should move the oldest frame in the queue
// to the stream's controller.
EXPECT_EQ(source->DesiredSizeForTesting(), 0);
EXPECT_FALSE(source->IsPendingPullForTesting());
source->pull(script_state);
EXPECT_EQ(source->DesiredSizeForTesting(), -1);
EXPECT_FALSE(source->IsPendingPullForTesting());
EXPECT_EQ(queue.size(), buffer_size - 1);
EXPECT_EQ(queue.front()->timestamp(), base::TimeDelta::FromSeconds(2));
// Push an extra frame. DesiredSize() does not change this time because the
// frame is dropped.
PushFrame();
EXPECT_EQ(source->Controller()->DesiredSize(), -1);
source->Close();
EXPECT_EQ(queue.size(), 0u);
}
TEST_F(MediaStreamVideoTrackUnderlyingSourceTest,
BypassQueueAfterPullWithEmptyBuffer) {
V8TestingScope v8_scope;
ScriptState* script_state = v8_scope.GetScriptState();
auto* source = CreateSource(script_state);
// Create a stream to ensure there is a controller associated to the source.
ReadableStream::CreateWithCountQueueingStrategy(script_state, source, 0);
MockMediaStreamVideoSink mock_sink;
mock_sink.ConnectToTrack(WebMediaStreamTrack(source->Track()));
auto push_frame_sync = [&mock_sink, this]() {
base::RunLoop sink_loop;
EXPECT_CALL(mock_sink, OnVideoFrame(_))
.WillOnce(base::test::RunOnceClosure(sink_loop.QuitClosure()));
PushFrame();
sink_loop.Run();
};
// At first, the queue is empty and the desired size is empty as well.
EXPECT_TRUE(source->QueueForTesting().empty());
EXPECT_EQ(source->DesiredSizeForTesting(), 0);
EXPECT_FALSE(source->IsPendingPullForTesting());
source->pull(script_state);
EXPECT_TRUE(source->QueueForTesting().empty());
EXPECT_EQ(source->DesiredSizeForTesting(), 0);
EXPECT_TRUE(source->IsPendingPullForTesting());
push_frame_sync();
// Since a pull was pending, the frame is put directly in the stream
// controller, bypassing the source queue.
EXPECT_TRUE(source->QueueForTesting().empty());
EXPECT_EQ(source->DesiredSizeForTesting(), -1);
EXPECT_FALSE(source->IsPendingPullForTesting());
source->Close();
}
TEST_F(MediaStreamVideoTrackUnderlyingSourceTest, QueueSizeCannotBeZero) {
V8TestingScope v8_scope;
ScriptState* script_state = v8_scope.GetScriptState();
auto* source = CreateSource(script_state, 0u);
// Queue size is always at least 1, even if 0 is requested.
EXPECT_EQ(source->MaxQueueSize(), 1u);
source->Close();
}
} // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment