Commit 6c0be36c authored by Yutaka Hirano's avatar Yutaka Hirano Committed by Commit Bot

[WebSocket] Avoid frequent buffer deallocation

Tracing suggests WTF::Partitions::FastFree is slow. On my workstation
deallocating 260 segments takes more than 100us. This CL stops using
SharedBuffer and introduces WebSocketMessageChunkAccumulator which pools
segments. A WebSocketMessageChunkAccumulator will free segments which
were not used in the last 100ms.

This improves [1] in my environment.

Without the change: 390MB/s
With the change   : 416MB/s

With --websocket-renderer-receive-quota-max=128000
     --websocket-read-buffer-size=32000, the result is

Without the change: 494MB/s
With the change   : 563MB/s

1: third_party/blink/perf_tests/websocket/receive-arraybuffer-1MBx100.html

Bug: 865001
Change-Id: Id87134e766b662816249b4a08e24cf844bb4a88f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1725351
Commit-Queue: Yutaka Hirano <yhirano@chromium.org>
Reviewed-by: default avatarKentaro Hara <haraken@chromium.org>
Reviewed-by: default avatarAdam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/master@{#689421}
parent f5e3d066
......@@ -427,6 +427,7 @@ jumbo_source_set("unit_tests") {
"websockets/mock_websocket_channel.h",
"websockets/websocket_channel_impl_test.cc",
"websockets/websocket_common_test.cc",
"websockets/websocket_message_chunk_accumulator_test.cc",
"worklet/animation_and_paint_worklet_thread_test.cc",
"worklet/worklet_thread_test_common.cc",
"worklet/worklet_thread_test_common.h",
......
......@@ -25,5 +25,7 @@ blink_modules_sources("websockets") {
"websocket_handle_client.h",
"websocket_handle_impl.cc",
"websocket_handle_impl.h",
"websocket_message_chunk_accumulator.cc",
"websocket_message_chunk_accumulator.h",
]
}
......@@ -194,6 +194,7 @@ WebSocketChannelImpl::WebSocketChannelImpl(
: handle_(std::move(handle)),
client_(client),
identifier_(CreateUniqueIdentifier()),
message_chunks_(execution_context->GetTaskRunner(TaskType::kNetworking)),
execution_context_(execution_context),
location_at_construction_(std::move(location)),
file_reading_task_runner_(
......@@ -684,15 +685,14 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
switch (type) {
case WebSocketHandle::kMessageTypeText:
DCHECK(!receiving_message_data_);
DCHECK_EQ(message_chunks_.GetSize(), 0u);
receiving_message_type_is_text_ = true;
break;
case WebSocketHandle::kMessageTypeBinary:
DCHECK(!receiving_message_data_);
DCHECK_EQ(message_chunks_.GetSize(), 0u);
receiving_message_type_is_text_ = false;
break;
case WebSocketHandle::kMessageTypeContinuation:
DCHECK(receiving_message_data_);
break;
}
......@@ -700,28 +700,20 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
if (!backpressure_)
AddReceiveFlowControlIfNecessary();
const size_t message_size_so_far =
(receiving_message_data_ ? receiving_message_data_->size() : 0) + size;
const size_t message_size_so_far = message_chunks_.GetSize();
if (message_size_so_far > std::numeric_limits<wtf_size_t>::max()) {
receiving_message_data_ = nullptr;
message_chunks_.Clear();
FailAsError("Message size is too large.");
return;
}
if (!fin) {
if (!receiving_message_data_) {
receiving_message_data_ = SharedBuffer::Create();
}
receiving_message_data_->Append(data, size);
message_chunks_.Append(base::make_span(data, size));
return;
}
const wtf_size_t message_size = static_cast<wtf_size_t>(message_size_so_far);
Vector<base::span<const char>> chunks;
if (receiving_message_data_) {
chunks.AppendRange(receiving_message_data_->begin(),
receiving_message_data_->end());
}
Vector<base::span<const char>> chunks = message_chunks_.GetView();
if (size > 0) {
chunks.push_back(base::make_span(data, size));
}
......@@ -754,7 +746,7 @@ void WebSocketChannelImpl::DidReceiveData(WebSocketHandle* handle,
} else {
client_->DidReceiveBinaryMessage(chunks);
}
receiving_message_data_ = nullptr;
message_chunks_.Clear();
}
void WebSocketChannelImpl::DidClose(WebSocketHandle* handle,
......
......@@ -42,6 +42,7 @@
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/modules/websockets/websocket_channel.h"
#include "third_party/blink/renderer/modules/websockets/websocket_handle.h"
#include "third_party/blink/renderer/modules/websockets/websocket_message_chunk_accumulator.h"
#include "third_party/blink/renderer/platform/heap/handle.h"
#include "third_party/blink/renderer/platform/scheduler/public/frame_scheduler.h"
#include "third_party/blink/renderer/platform/weborigin/kurl.h"
......@@ -229,7 +230,7 @@ class MODULES_EXPORT WebSocketChannelImpl final : public WebSocketChannel {
uint64_t identifier_;
Member<BlobLoader> blob_loader_;
HeapDeque<Member<Message>> messages_;
scoped_refptr<SharedBuffer> receiving_message_data_;
WebSocketMessageChunkAccumulator message_chunks_;
const Member<ExecutionContext> execution_context_;
bool backpressure_ = false;
......
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/websockets/websocket_message_chunk_accumulator.h"
#include <string.h>
#include <algorithm>
namespace blink {
constexpr size_t WebSocketMessageChunkAccumulator::kSegmentSize;
constexpr base::TimeDelta WebSocketMessageChunkAccumulator::kFreeDelay;
WebSocketMessageChunkAccumulator::WebSocketMessageChunkAccumulator(
scoped_refptr<base::SingleThreadTaskRunner> task_runner)
: timer_(std::move(task_runner),
this,
&WebSocketMessageChunkAccumulator::OnTimerFired) {}
WebSocketMessageChunkAccumulator::~WebSocketMessageChunkAccumulator() = default;
void WebSocketMessageChunkAccumulator::Append(base::span<const char> data) {
if (!segments_.IsEmpty()) {
const size_t to_be_written =
std::min(data.size(), kSegmentSize - GetLastSegmentSize());
memcpy(segments_.back().get() + GetLastSegmentSize(), data.data(),
to_be_written);
data = data.subspan(to_be_written);
size_ += to_be_written;
}
while (!data.empty()) {
SegmentPtr segment_ptr;
if (pool_.IsEmpty()) {
segment_ptr = CreateSegment();
} else {
segment_ptr = std::move(pool_.back());
pool_.pop_back();
}
const size_t to_be_written = std::min(data.size(), kSegmentSize);
memcpy(segment_ptr.get(), data.data(), to_be_written);
data = data.subspan(to_be_written);
size_ += to_be_written;
segments_.push_back(std::move(segment_ptr));
}
}
Vector<base::span<const char>> WebSocketMessageChunkAccumulator::GetView()
const {
Vector<base::span<const char>> view;
if (segments_.IsEmpty()) {
return view;
}
view.ReserveCapacity(segments_.size());
for (wtf_size_t i = 0; i < segments_.size() - 1; ++i) {
view.push_back(base::make_span(segments_[i].get(), kSegmentSize));
}
view.push_back(base::make_span(segments_.back().get(), GetLastSegmentSize()));
return view;
}
void WebSocketMessageChunkAccumulator::Clear() {
num_pooled_segments_to_be_removed_ =
std::min(num_pooled_segments_to_be_removed_, pool_.size());
size_ = 0;
pool_.ReserveCapacity(pool_.size() + segments_.size());
for (auto& segment : segments_) {
pool_.push_back(std::move(segment));
}
segments_.clear();
if (timer_.IsActive()) {
return;
}
// We will remove all the segments if no one uses them in the near future.
num_pooled_segments_to_be_removed_ = pool_.size();
if (num_pooled_segments_to_be_removed_ > 0) {
timer_.StartOneShot(kFreeDelay, FROM_HERE);
}
}
void WebSocketMessageChunkAccumulator::OnTimerFired(TimerBase*) {
DCHECK(!timer_.IsActive());
const auto to_be_removed =
std::min(num_pooled_segments_to_be_removed_, pool_.size());
pool_.EraseAt(pool_.size() - to_be_removed, to_be_removed);
// We will remove all the segments if no one uses them in the near future.
num_pooled_segments_to_be_removed_ = pool_.size();
if (num_pooled_segments_to_be_removed_ > 0) {
timer_.StartOneShot(kFreeDelay, FROM_HERE);
}
}
} // namespace blink
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_MESSAGE_CHUNK_ACCUMULATOR_H_
#define THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_MESSAGE_CHUNK_ACCUMULATOR_H_
#include <memory>
#include "base/containers/span.h"
#include "base/memory/scoped_refptr.h"
#include "base/time/time.h"
#include "third_party/blink/renderer/modules/modules_export.h"
#include "third_party/blink/renderer/platform/timer.h"
#include "third_party/blink/renderer/platform/wtf/allocator/allocator.h"
#include "third_party/blink/renderer/platform/wtf/vector.h"
#include "third_party/blink/renderer/platform/wtf/wtf_size_t.h"
namespace blink {
class SingleThreadTaskRunner;
// WebSocketMessageChunkAccumulator stores chunks for one WebSocket message. A
// user can call Append() to append bytes, and call GetView() to get a list of
// base::spans of data previously stored.
// We don't use SharedBuffer due to an observed performance problem of FastFree.
// TODO(yhirano): Remove this once the performance problem is fixed in a general
// manner.
class MODULES_EXPORT WebSocketMessageChunkAccumulator final {
DISALLOW_NEW();
public:
explicit WebSocketMessageChunkAccumulator(
scoped_refptr<base::SingleThreadTaskRunner> task_runner);
~WebSocketMessageChunkAccumulator();
// Appends |data| to this instance.
void Append(base::span<const char> data);
// Returns the number of bytes stored in this instance.
size_t GetSize() const { return size_; }
// Clears the stored data. Memory regions for chunks may be kept for future
// uses for certain amount of time.
void Clear();
// The regions will be available until Clear() is called.
Vector<base::span<const char>> GetView() const;
wtf_size_t GetPoolSizeForTesting() const { return pool_.size(); }
bool IsTimerActiveForTesting() const { return timer_.IsActive(); }
static constexpr size_t kSegmentSize = 16 * 1024;
static constexpr base::TimeDelta kFreeDelay =
base::TimeDelta::FromMilliseconds(100);
private:
struct SegmentDeleter {
void operator()(char* p) const { WTF::Partitions::FastFree(p); }
};
using SegmentPtr = std::unique_ptr<char[], SegmentDeleter>;
static SegmentPtr CreateSegment() {
return SegmentPtr(static_cast<char*>(WTF::Partitions::FastMalloc(
kSegmentSize, "blink::WebSocketMessageChunkAccumulator::Segment")));
}
void OnTimerFired(TimerBase*);
size_t GetLastSegmentSize() const {
DCHECK(!segments_.IsEmpty());
return size_ % kSegmentSize > 0 ? size_ % kSegmentSize : kSegmentSize;
}
Vector<SegmentPtr> segments_;
Vector<SegmentPtr> pool_;
size_t size_ = 0;
wtf_size_t num_pooled_segments_to_be_removed_ = 0;
TaskRunnerTimer<WebSocketMessageChunkAccumulator> timer_;
};
} // namespace blink
#endif // THIRD_PARTY_BLINK_RENDERER_MODULES_WEBSOCKETS_WEBSOCKET_MESSAGE_CHUNK_ACCUMULATOR_H_
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "third_party/blink/renderer/modules/websockets/websocket_message_chunk_accumulator.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/blink/renderer/platform/scheduler/test/fake_task_runner.h"
namespace blink {
namespace {
class WebSocketMessageChunkAccumulatorTest : public testing::Test {
public:
using FakeTaskRunner = scheduler::FakeTaskRunner;
static Vector<char> Flatten(const Vector<base::span<const char>>& chunks) {
Vector<char> v;
for (const auto& chunk : chunks) {
v.Append(chunk.data(), chunk.size());
}
return v;
}
static constexpr auto kSegmentSize =
WebSocketMessageChunkAccumulator::kSegmentSize;
static constexpr auto kFreeDelay =
WebSocketMessageChunkAccumulator::kFreeDelay;
};
constexpr size_t WebSocketMessageChunkAccumulatorTest::kSegmentSize;
constexpr base::TimeDelta WebSocketMessageChunkAccumulatorTest::kFreeDelay;
TEST_F(WebSocketMessageChunkAccumulatorTest, Empty) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
EXPECT_EQ(chunks.GetSize(), 0u);
EXPECT_TRUE(chunks.GetView().IsEmpty());
}
TEST_F(WebSocketMessageChunkAccumulatorTest, Append) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
Vector<char> chunk(8, 'x');
chunks.Append(base::make_span(chunk));
EXPECT_EQ(chunks.GetSize(), chunk.size());
EXPECT_EQ(8u, chunks.GetSize());
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(chunks.GetView()[0].size(), 8u);
ASSERT_EQ(Flatten(chunks.GetView()), chunk);
}
TEST_F(WebSocketMessageChunkAccumulatorTest, AppendChunkWithInternalChunkSize) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
Vector<char> chunk(kSegmentSize, 'y');
chunks.Append(base::make_span(chunk));
EXPECT_EQ(chunks.GetSize(), chunk.size());
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(chunks.GetView()[0].size(), kSegmentSize);
ASSERT_EQ(Flatten(chunks.GetView()), chunk);
}
TEST_F(WebSocketMessageChunkAccumulatorTest, AppendLargeChunk) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
Vector<char> chunk(kSegmentSize * 2 + 2, 'y');
chunks.Append(base::make_span(chunk));
EXPECT_EQ(chunks.GetSize(), chunk.size());
ASSERT_EQ(chunks.GetView().size(), 3u);
ASSERT_EQ(chunks.GetView()[0].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[1].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[2].size(), 2u);
ASSERT_EQ(Flatten(chunks.GetView()), chunk);
}
TEST_F(WebSocketMessageChunkAccumulatorTest, AppendRepeatedly) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
Vector<char> chunk1(8, 'a');
Vector<char> chunk2(4, 'b');
Vector<char> chunk3; // empty
Vector<char> chunk4(kSegmentSize * 3 - 12, 'd');
Vector<char> chunk5(6, 'e');
Vector<char> chunk6(kSegmentSize - 5, 'f');
// This will grow over time.
Vector<char> expected;
chunks.Append(base::make_span(chunk1));
expected.AppendVector(chunk1);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(chunks.GetView()[0].size(), 8u);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
chunks.Append(base::make_span(chunk2));
expected.AppendVector(chunk2);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(chunks.GetView()[0].size(), 12u);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
chunks.Append(base::make_span(chunk3));
expected.AppendVector(chunk3);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(chunks.GetView()[0].size(), 12u);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
chunks.Append(base::make_span(chunk4));
expected.AppendVector(chunk4);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 3u);
ASSERT_EQ(chunks.GetView()[0].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[1].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[2].size(), kSegmentSize);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
chunks.Append(base::make_span(chunk5));
expected.AppendVector(chunk5);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 4u);
ASSERT_EQ(chunks.GetView()[0].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[1].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[2].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[3].size(), 6u);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
chunks.Append(base::make_span(chunk6));
expected.AppendVector(chunk6);
EXPECT_EQ(chunks.GetSize(), expected.size());
ASSERT_EQ(chunks.GetView().size(), 5u);
ASSERT_EQ(chunks.GetView()[0].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[1].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[2].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[3].size(), kSegmentSize);
ASSERT_EQ(chunks.GetView()[4].size(), 1u);
ASSERT_EQ(Flatten(chunks.GetView()), expected);
}
TEST_F(WebSocketMessageChunkAccumulatorTest, ClearAndAppend) {
WebSocketMessageChunkAccumulator chunks(
base::MakeRefCounted<FakeTaskRunner>());
Vector<char> chunk1(8, 'x');
Vector<char> chunk2(3, 'y');
chunks.Clear();
EXPECT_EQ(chunks.GetSize(), 0u);
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
chunks.Append(base::make_span(chunk1));
EXPECT_EQ(chunks.GetSize(), 8u);
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(Flatten(chunks.GetView()), chunk1);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
chunks.Clear();
EXPECT_EQ(chunks.GetSize(), 0u);
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 1u);
chunks.Append(base::make_span(chunk2));
EXPECT_EQ(chunks.GetSize(), 3u);
ASSERT_EQ(chunks.GetView().size(), 1u);
ASSERT_EQ(Flatten(chunks.GetView()), chunk2);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
}
TEST_F(WebSocketMessageChunkAccumulatorTest, ClearTimer) {
auto task_runner = base::MakeRefCounted<FakeTaskRunner>();
WebSocketMessageChunkAccumulator chunks(task_runner);
Vector<char> chunk1(kSegmentSize * 4, 'x');
Vector<char> chunk2(kSegmentSize * 3, 'x');
Vector<char> chunk3(kSegmentSize * 1, 'x');
// We don't start the timer because GetPoolSizeForTesting() is 0.
chunks.Clear();
EXPECT_FALSE(chunks.IsTimerActiveForTesting());
EXPECT_EQ(chunks.GetSize(), 0u);
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
chunks.Append(base::make_span(chunk1));
ASSERT_EQ(chunks.GetView().size(), 4u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
// We start the timer here.
// |num_pooled_segments_to_be_removed_| is 4.
chunks.Clear();
EXPECT_TRUE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 4u);
chunks.Append(base::make_span(chunk2));
ASSERT_EQ(chunks.GetView().size(), 3u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 1u);
// We don't start the timer because it's already active.
// |num_pooled_segments_to_be_removed_| is set to 1.
chunks.Clear();
EXPECT_TRUE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 4u);
// We remove 1 chunk from |pooled_segments_|.
// We start the timer because |num_pooled_segments_to_be_removed_| > 0.
task_runner->AdvanceTimeAndRun(kFreeDelay);
EXPECT_TRUE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 3u);
chunks.Append(base::make_span(chunk3));
ASSERT_EQ(chunks.GetView().size(), 1u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 2u);
// We remove 2 chunks from |pooled_segments_|.
// |num_pooled_segments_to_be_removed_| is 3 but we only have 2 pooled
// segments. We don't start the timer because we don't have pooled
// segments any more.
task_runner->AdvanceTimeAndRun(kFreeDelay);
EXPECT_FALSE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 1u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
// We start the timer here. num_pooled_segments_to_be_removed_ is set to 1.
chunks.Clear();
EXPECT_TRUE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 1u);
// We remove 1 chunk from |pooled_segments_|.
// We don't start the timer because we don't have pooled segments any more.
task_runner->AdvanceTimeAndRun(kFreeDelay);
EXPECT_FALSE(chunks.IsTimerActiveForTesting());
ASSERT_EQ(chunks.GetView().size(), 0u);
EXPECT_EQ(chunks.GetPoolSizeForTesting(), 0u);
}
} // namespace
} // namespace blink
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment