Commit 30b618cc authored by Eric Seckler's avatar Eric Seckler Committed by Commit Bot

tracing: Send bigger proto trace slices via mojo pipe in ConsumerHost

Previously, we were writing individual packet headers + payloads one by
one onto the mojo pipe. Internally, each write causes a memcpy into a
shmem buffer and a mojo notification to be sent to the Consumer to read
the data from the buffer.

This was probably not a big problem when a trace packet contained many
trace events, but now that each packet is only a single event (and
especially now that each COMPLETE event is split into separate
begin/end events, i.e. separate packets), this causes a lot of
overhead.

Instead, this patch modifies ConsumerHost to write a single slice per
ReadBuffer callback to the mojo pipe, which makes proto trace
collection significantly faster.

Bug: 1039726
Change-Id: I057c68ffafb99cbd2b41d6c6308391cc11cf1883
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1991448
Auto-Submit: Eric Seckler <eseckler@chromium.org>
Commit-Queue: Sami Kyöstilä <skyostil@chromium.org>
Reviewed-by: default avatarSami Kyöstilä <skyostil@chromium.org>
Cr-Commit-Position: refs/heads/master@{#730178}
parent 55979fb0
...@@ -84,7 +84,7 @@ class JsonStringOutputWriter ...@@ -84,7 +84,7 @@ class JsonStringOutputWriter
class ConsumerHost::StreamWriter { class ConsumerHost::StreamWriter {
public: public:
using Slices = std::vector<std::string>; using Slice = std::string;
static scoped_refptr<base::SequencedTaskRunner> CreateTaskRunner() { static scoped_refptr<base::SequencedTaskRunner> CreateTaskRunner() {
return base::CreateSequencedTaskRunner({base::ThreadPool(), return base::CreateSequencedTaskRunner({base::ThreadPool(),
...@@ -101,34 +101,32 @@ class ConsumerHost::StreamWriter { ...@@ -101,34 +101,32 @@ class ConsumerHost::StreamWriter {
disconnect_callback_(std::move(disconnect_callback)), disconnect_callback_(std::move(disconnect_callback)),
callback_task_runner_(callback_task_runner) {} callback_task_runner_(callback_task_runner) {}
void WriteToStream(std::unique_ptr<Slices> slices, bool has_more) { void WriteToStream(std::unique_ptr<Slice> slice, bool has_more) {
DCHECK(stream_.is_valid()); DCHECK(stream_.is_valid());
for (const auto& slice : *slices) {
uint32_t write_position = 0;
while (write_position < slice.size()) { uint32_t write_position = 0;
uint32_t write_bytes = slice.size() - write_position; while (write_position < slice->size()) {
uint32_t write_bytes = slice->size() - write_position;
MojoResult result = MojoResult result =
stream_->WriteData(slice.data() + write_position, &write_bytes, stream_->WriteData(slice->data() + write_position, &write_bytes,
MOJO_WRITE_DATA_FLAG_NONE); MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK) { if (result == MOJO_RESULT_OK) {
write_position += write_bytes; write_position += write_bytes;
continue; continue;
} }
if (result == MOJO_RESULT_SHOULD_WAIT) { if (result == MOJO_RESULT_SHOULD_WAIT) {
result = mojo::Wait(stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE); result = mojo::Wait(stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE);
} }
if (result != MOJO_RESULT_OK) { if (result != MOJO_RESULT_OK) {
if (!disconnect_callback_.is_null()) { if (!disconnect_callback_.is_null()) {
callback_task_runner_->PostTask(FROM_HERE, callback_task_runner_->PostTask(FROM_HERE,
std::move(disconnect_callback_)); std::move(disconnect_callback_));
}
return;
} }
return;
} }
} }
...@@ -465,11 +463,10 @@ void ConsumerHost::TracingSession::ExportJson() { ...@@ -465,11 +463,10 @@ void ConsumerHost::TracingSession::ExportJson() {
void ConsumerHost::TracingSession::OnJSONTraceData(std::string json, void ConsumerHost::TracingSession::OnJSONTraceData(std::string json,
bool has_more) { bool has_more) {
auto slices = std::make_unique<StreamWriter::Slices>(); auto slice = std::make_unique<StreamWriter::Slice>();
slices->push_back(std::string()); slice->swap(json);
slices->back().swap(json);
read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream, read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream,
std::move(slices), has_more); std::move(slice), has_more);
if (!has_more) { if (!has_more) {
read_buffers_stream_writer_.Reset(); read_buffers_stream_writer_.Reset();
...@@ -480,14 +477,15 @@ void ConsumerHost::TracingSession::OnTraceData( ...@@ -480,14 +477,15 @@ void ConsumerHost::TracingSession::OnTraceData(
std::vector<perfetto::TracePacket> packets, std::vector<perfetto::TracePacket> packets,
bool has_more) { bool has_more) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (trace_processor_) {
// Calculate space needed for trace chunk. Each packet has a preamble and
// payload size.
size_t max_size = packets.size() * perfetto::TracePacket::kMaxPreambleBytes;
for (const auto& packet : packets) {
max_size += packet.size();
}
// Calculate space needed for trace chunk. Each packet has a preamble and
// payload size.
size_t max_size = packets.size() * perfetto::TracePacket::kMaxPreambleBytes;
for (const auto& packet : packets) {
max_size += packet.size();
}
if (trace_processor_) {
// Copy packets into a trace file chunk. // Copy packets into a trace file chunk.
size_t position = 0; size_t position = 0;
std::unique_ptr<uint8_t[]> data(new uint8_t[max_size]); std::unique_ptr<uint8_t[]> data(new uint8_t[max_size]);
...@@ -518,19 +516,21 @@ void ConsumerHost::TracingSession::OnTraceData( ...@@ -518,19 +516,21 @@ void ConsumerHost::TracingSession::OnTraceData(
return; return;
} }
auto copy = std::make_unique<StreamWriter::Slices>(); // Copy packets into a trace slice.
auto chunk = std::make_unique<StreamWriter::Slice>();
chunk->reserve(max_size);
for (auto& packet : packets) { for (auto& packet : packets) {
char* data; char* data;
size_t size; size_t size;
std::tie(data, size) = packet.GetProtoPreamble(); std::tie(data, size) = packet.GetProtoPreamble();
copy->emplace_back(data, size); chunk->append(data, size);
auto& slices = packet.slices(); auto& slices = packet.slices();
for (auto& slice : slices) { for (auto& slice : slices) {
copy->emplace_back(static_cast<const char*>(slice.start), slice.size); chunk->append(static_cast<const char*>(slice.start), slice.size);
} }
} }
read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream, read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream,
std::move(copy), has_more); std::move(chunk), has_more);
if (!has_more) { if (!has_more) {
read_buffers_stream_writer_.Reset(); read_buffers_stream_writer_.Reset();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment