Commit ce8b9f68 authored by Siddhartha S's avatar Siddhartha S Committed by Commit Bot

tracing: Consumer host will write / wait to write trace buffer in background

Writing to consumer client data pipe in main thread can cause us to wait
for the pipe to be ready on main thread. The main thread need to run
other work like giving out SMBs to processes. It can also cause deadlock
if the consumer client is also waiting for SMB. So, move the writer to
background task runner.

BUG=953837

Change-Id: I5cad2c45af8b4c7f0a69372e1c2633896c7e518e
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1573162
Commit-Queue: ssid <ssid@chromium.org>
Reviewed-by: default avataroysteine <oysteine@chromium.org>
Cr-Commit-Position: refs/heads/master@{#652725}
parent 7d94539f
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "base/strings/strcat.h" #include "base/strings/strcat.h"
#include "base/strings/string_number_conversions.h" #include "base/strings/string_number_conversions.h"
#include "base/strings/string_util.h" #include "base/strings/string_util.h"
#include "base/task/post_task.h"
#include "build/build_config.h" #include "build/build_config.h"
#include "mojo/public/cpp/bindings/strong_binding.h" #include "mojo/public/cpp/bindings/strong_binding.h"
#include "mojo/public/cpp/system/wait.h" #include "mojo/public/cpp/system/wait.h"
...@@ -41,6 +42,70 @@ bool StringToProcessId(const std::string& input, base::ProcessId* output) { ...@@ -41,6 +42,70 @@ bool StringToProcessId(const std::string& input, base::ProcessId* output) {
} // namespace } // namespace
class ConsumerHost::StreamWriter {
public:
using Slices = std::vector<std::string>;
static scoped_refptr<base::SequencedTaskRunner> CreateTaskRunner() {
return base::CreateSequencedTaskRunnerWithTraits(
{base::WithBaseSyncPrimitives(), base::TaskPriority::BEST_EFFORT});
}
StreamWriter(mojo::ScopedDataPipeProducerHandle stream,
ReadBuffersCallback callback,
base::OnceClosure disconnect_callback,
scoped_refptr<base::SequencedTaskRunner> callback_task_runner)
: stream_(std::move(stream)),
read_buffers_callback_(std::move(callback)),
disconnect_callback_(std::move(disconnect_callback)),
callback_task_runner_(callback_task_runner) {}
void WriteToStream(std::unique_ptr<Slices> slices, bool has_more) {
DCHECK(stream_.is_valid());
for (const auto& slice : *slices) {
uint32_t write_position = 0;
while (write_position < slice.size()) {
uint32_t write_bytes = slice.size() - write_position;
MojoResult result =
stream_->WriteData(slice.data() + write_position, &write_bytes,
MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK) {
write_position += write_bytes;
continue;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
result = mojo::Wait(stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE);
}
if (result != MOJO_RESULT_OK) {
if (!disconnect_callback_.is_null()) {
callback_task_runner_->PostTask(FROM_HERE,
std::move(disconnect_callback_));
}
return;
}
}
}
if (!has_more && !read_buffers_callback_.is_null()) {
callback_task_runner_->PostTask(FROM_HERE,
std::move(read_buffers_callback_));
}
}
private:
mojo::ScopedDataPipeProducerHandle stream_;
ReadBuffersCallback read_buffers_callback_;
base::OnceClosure disconnect_callback_;
scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;
DISALLOW_COPY_AND_ASSIGN(StreamWriter);
};
// static // static
bool ConsumerHost::ParsePidFromProducerName(const std::string& producer_name, bool ConsumerHost::ParsePidFromProducerName(const std::string& producer_name,
base::ProcessId* pid) { base::ProcessId* pid) {
...@@ -164,8 +229,12 @@ void ConsumerHost::Flush(uint32_t timeout, ...@@ -164,8 +229,12 @@ void ConsumerHost::Flush(uint32_t timeout,
void ConsumerHost::ReadBuffers(mojo::ScopedDataPipeProducerHandle stream, void ConsumerHost::ReadBuffers(mojo::ScopedDataPipeProducerHandle stream,
ReadBuffersCallback callback) { ReadBuffersCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
read_buffers_stream_ = std::move(stream);
read_buffers_callback_ = std::move(callback); read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
base::BindOnce(&ConsumerHost::OnConsumerClientDisconnected,
weak_factory_.GetWeakPtr()),
base::SequencedTaskRunnerHandle::Get());
consumer_endpoint_->ReadBuffers(); consumer_endpoint_->ReadBuffers();
} }
...@@ -175,11 +244,13 @@ void ConsumerHost::DisableTracingAndEmitJson( ...@@ -175,11 +244,13 @@ void ConsumerHost::DisableTracingAndEmitJson(
mojo::ScopedDataPipeProducerHandle stream, mojo::ScopedDataPipeProducerHandle stream,
DisableTracingAndEmitJsonCallback callback) { DisableTracingAndEmitJsonCallback callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(!read_buffers_stream_ && !read_buffers_callback_ && DCHECK(!read_buffers_stream_writer_);
!json_trace_exporter_);
read_buffers_stream_ = std::move(stream); read_buffers_stream_writer_ = base::SequenceBound<StreamWriter>(
read_buffers_callback_ = std::move(callback); StreamWriter::CreateTaskRunner(), std::move(stream), std::move(callback),
base::BindOnce(&ConsumerHost::OnConsumerClientDisconnected,
weak_factory_.GetWeakPtr()),
base::SequencedTaskRunnerHandle::Get());
// TODO(eseckler): Support argument/metadata filtering. // TODO(eseckler): Support argument/metadata filtering.
json_trace_exporter_ = std::make_unique<TrackEventJSONExporter>( json_trace_exporter_ = std::make_unique<TrackEventJSONExporter>(
...@@ -239,22 +310,21 @@ void ConsumerHost::OnTraceData(std::vector<perfetto::TracePacket> packets, ...@@ -239,22 +310,21 @@ void ConsumerHost::OnTraceData(std::vector<perfetto::TracePacket> packets,
return; return;
} }
auto copy = std::make_unique<StreamWriter::Slices>();
for (auto& packet : packets) { for (auto& packet : packets) {
char* data; char* data;
size_t size; size_t size;
std::tie(data, size) = packet.GetProtoPreamble(); std::tie(data, size) = packet.GetProtoPreamble();
WriteToStream(data, size); copy->emplace_back(data, size);
auto& slices = packet.slices(); auto& slices = packet.slices();
for (auto& slice : slices) { for (auto& slice : slices) {
WriteToStream(slice.start, slice.size); copy->emplace_back(static_cast<const char*>(slice.start), slice.size);
} }
} }
read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream,
std::move(copy), has_more);
if (!has_more) { if (!has_more) {
read_buffers_stream_.reset(); read_buffers_stream_writer_.Reset();
if (read_buffers_callback_) {
std::move(read_buffers_callback_).Run();
}
} }
} }
...@@ -357,54 +427,23 @@ void ConsumerHost::OnTraceStats(bool success, ...@@ -357,54 +427,23 @@ void ConsumerHost::OnTraceStats(bool success,
std::move(request_buffer_usage_callback_).Run(true, percent_full); std::move(request_buffer_usage_callback_).Run(true, percent_full);
} }
void ConsumerHost::OnJSONTraceData(const std::string& json, void ConsumerHost::OnJSONTraceData(std::string* json,
base::DictionaryValue* metadata, base::DictionaryValue* metadata,
bool has_more) { bool has_more) {
WriteToStream(json.data(), json.size()); auto slices = std::make_unique<StreamWriter::Slices>();
slices->push_back(std::string());
slices->back().swap(*json);
read_buffers_stream_writer_.Post(FROM_HERE, &StreamWriter::WriteToStream,
std::move(slices), has_more);
if (has_more) { if (!has_more) {
return; read_buffers_stream_writer_.Reset();
}
read_buffers_stream_.reset();
if (read_buffers_callback_) {
std::move(read_buffers_callback_).Run();
} }
} }
void ConsumerHost::WriteToStream(const void* start, size_t size) { void ConsumerHost::OnConsumerClientDisconnected() {
TRACE_EVENT0("ipc", "ConsumerHost::WriteToStream"); // Bail out; destination handle got closed.
DCHECK(read_buffers_stream_.is_valid()); consumer_endpoint_->FreeBuffers();
uint32_t write_position = 0;
while (write_position < size) {
uint32_t write_bytes = size - write_position;
MojoResult result = read_buffers_stream_->WriteData(
static_cast<const uint8_t*>(start) + write_position, &write_bytes,
MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_OK) {
write_position += write_bytes;
continue;
}
if (result == MOJO_RESULT_SHOULD_WAIT) {
// TODO(oysteine): If we end up actually blocking here it means
// the client is consuming data slower than Perfetto is producing
// it. Consider other solutions at that point because it means
// eventually Producers will run out of chunks and will stall waiting
// for new ones.
result =
mojo::Wait(read_buffers_stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE);
}
if (result != MOJO_RESULT_OK) {
// Bail out; destination handle got closed.
consumer_endpoint_->FreeBuffers();
return;
}
}
} }
} // namespace tracing } // namespace tracing
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "base/macros.h" #include "base/macros.h"
#include "base/memory/weak_ptr.h" #include "base/memory/weak_ptr.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/threading/sequence_bound.h"
#include "base/timer/timer.h" #include "base/timer/timer.h"
#include "mojo/public/cpp/bindings/binding.h" #include "mojo/public/cpp/bindings/binding.h"
#include "services/tracing/public/mojom/perfetto_service.mojom.h" #include "services/tracing/public/mojom/perfetto_service.mojom.h"
...@@ -81,19 +82,20 @@ class ConsumerHost : public perfetto::Consumer, public mojom::ConsumerHost { ...@@ -81,19 +82,20 @@ class ConsumerHost : public perfetto::Consumer, public mojom::ConsumerHost {
void OnActiveServicePidsInitialized(); void OnActiveServicePidsInitialized();
private: private:
class StreamWriter;
perfetto::TraceConfig AdjustTraceConfig( perfetto::TraceConfig AdjustTraceConfig(
const perfetto::TraceConfig& trace_config); const perfetto::TraceConfig& trace_config);
void OnEnableTracingTimeout(); void OnEnableTracingTimeout();
void MaybeSendEnableTracingAck(); void MaybeSendEnableTracingAck();
bool IsExpectedPid(base::ProcessId pid) const; bool IsExpectedPid(base::ProcessId pid) const;
void OnJSONTraceData(const std::string& json, void OnJSONTraceData(std::string* json,
base::DictionaryValue* metadata, base::DictionaryValue* metadata,
bool has_more); bool has_more);
void WriteToStream(const void* start, size_t size); void OnConsumerClientDisconnected();
PerfettoService* const service_; PerfettoService* const service_;
mojo::ScopedDataPipeProducerHandle read_buffers_stream_; base::SequenceBound<StreamWriter> read_buffers_stream_writer_;
ReadBuffersCallback read_buffers_callback_;
base::OnceCallback<void(bool)> flush_callback_; base::OnceCallback<void(bool)> flush_callback_;
mojom::TracingSessionPtr tracing_session_; mojom::TracingSessionPtr tracing_session_;
std::set<base::ProcessId> filtered_pids_; std::set<base::ProcessId> filtered_pids_;
......
...@@ -247,6 +247,14 @@ class ThreadedPerfettoService : public mojom::TracingSession { ...@@ -247,6 +247,14 @@ class ThreadedPerfettoService : public mojom::TracingSession {
return config; return config;
} }
void ClearConsumer() {
base::RunLoop wait_loop;
task_runner_->PostTaskAndReply(
FROM_HERE, base::BindLambdaForTesting([&]() { consumer_.reset(); }),
wait_loop.QuitClosure());
wait_loop.Run();
}
private: private:
scoped_refptr<base::SequencedTaskRunner> task_runner_; scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<PerfettoService> perfetto_service_; std::unique_ptr<PerfettoService> perfetto_service_;
...@@ -395,6 +403,26 @@ TEST_F(TracingConsumerTest, ReceiveTestPackets) { ...@@ -395,6 +403,26 @@ TEST_F(TracingConsumerTest, ReceiveTestPackets) {
EXPECT_EQ(10u, matching_packet_count()); EXPECT_EQ(10u, matching_packet_count());
} }
TEST_F(TracingConsumerTest, DeleteConsumerWhenReceiving) {
EnableTracingWithDataSourceName(mojom::kTraceEventDataSourceName);
base::RunLoop wait_for_tracing_start;
threaded_perfetto_service()->CreateProducer(
mojom::kTraceEventDataSourceName, 100u,
wait_for_tracing_start.QuitClosure());
wait_for_tracing_start.Run();
base::RunLoop no_more_data;
ExpectPackets(kPerfettoTestString, no_more_data.QuitClosure());
threaded_perfetto_service()->DisableTracing();
ReadBuffers();
threaded_perfetto_service()->ClearConsumer();
no_more_data.Run();
}
TEST_F(TracingConsumerTest, FlushProducers) { TEST_F(TracingConsumerTest, FlushProducers) {
EnableTracingWithDataSourceName(mojom::kTraceEventDataSourceName); EnableTracingWithDataSourceName(mojom::kTraceEventDataSourceName);
......
...@@ -440,10 +440,11 @@ void JSONTraceExporter::StringBuffer::EscapeJSONAndAppend( ...@@ -440,10 +440,11 @@ void JSONTraceExporter::StringBuffer::EscapeJSONAndAppend(
void JSONTraceExporter::StringBuffer::Flush(base::DictionaryValue* metadata, void JSONTraceExporter::StringBuffer::Flush(base::DictionaryValue* metadata,
bool has_more) { bool has_more) {
callback_.Run(out_, metadata, has_more); callback_.Run(&out_, metadata, has_more);
if (has_more) { if (has_more) {
// We clear |out_| because we've processed all the current data in |out_| // We clear |out_| because we've processed all the current data in |out_|
// and we don't want any data to be repeated. We have to protect this by // and we don't want any data to be repeated. The callback should have moved
// all the contents, but clear it to be safe. We have to protect this by
// checking |has_more| because the callback could have deleted |this| in // checking |has_more| because the callback could have deleted |this| in
// which cause |out_| is a destroyed as well. // which cause |out_| is a destroyed as well.
out_.clear(); out_.clear();
......
...@@ -55,10 +55,8 @@ class JSONTraceExporter { ...@@ -55,10 +55,8 @@ class JSONTraceExporter {
using MetadataFilterPredicate = using MetadataFilterPredicate =
base::RepeatingCallback<bool(const std::string& metadata_name)>; base::RepeatingCallback<bool(const std::string& metadata_name)>;
using OnTraceEventJSONCallback = using OnTraceEventJSONCallback = base::RepeatingCallback<
base::RepeatingCallback<void(const std::string& json, void(std::string* json, base::DictionaryValue* metadata, bool has_more)>;
base::DictionaryValue* metadata,
bool has_more)>;
JSONTraceExporter(ArgumentFilterPredicate argument_filter_predicate, JSONTraceExporter(ArgumentFilterPredicate argument_filter_predicate,
MetadataFilterPredicate metadata_filter_predicate, MetadataFilterPredicate metadata_filter_predicate,
......
...@@ -195,11 +195,12 @@ class JsonTraceExporterTest : public testing::Test { ...@@ -195,11 +195,12 @@ class JsonTraceExporterTest : public testing::Test {
base::BindRepeating(&JsonTraceExporterTest::OnTraceEventJSON, base::BindRepeating(&JsonTraceExporterTest::OnTraceEventJSON,
base::Unretained(this)))) {} base::Unretained(this)))) {}
void OnTraceEventJSON(const std::string& json, void OnTraceEventJSON(std::string* json,
base::DictionaryValue* metadata, base::DictionaryValue* metadata,
bool has_more) { bool has_more) {
unparsed_trace_data_ += json; unparsed_trace_data_ += *json;
unparsed_trace_data_sequence_.push_back(json); unparsed_trace_data_sequence_.push_back(std::string());
unparsed_trace_data_sequence_.back().swap(*json);
if (has_more) { if (has_more) {
return; return;
} }
...@@ -207,7 +208,7 @@ class JsonTraceExporterTest : public testing::Test { ...@@ -207,7 +208,7 @@ class JsonTraceExporterTest : public testing::Test {
base::JSONReader::ReadDeprecated(unparsed_trace_data_)); base::JSONReader::ReadDeprecated(unparsed_trace_data_));
EXPECT_TRUE(parsed_trace_data_); EXPECT_TRUE(parsed_trace_data_);
if (!parsed_trace_data_) { if (!parsed_trace_data_) {
LOG(ERROR) << "Couldn't parse json: \n" << json; LOG(ERROR) << "Couldn't parse json: \n" << unparsed_trace_data_;
} }
// The TraceAnalyzer expects the raw trace output, without the // The TraceAnalyzer expects the raw trace output, without the
......
...@@ -124,11 +124,11 @@ class PerfettoTracingCoordinator::TracingSession : public perfetto::Consumer { ...@@ -124,11 +124,11 @@ class PerfettoTracingCoordinator::TracingSession : public perfetto::Consumer {
consumer_endpoint_->DisableTracing(); consumer_endpoint_->DisableTracing();
} }
void OnJSONTraceEventCallback(const std::string& json, void OnJSONTraceEventCallback(std::string* json,
base::DictionaryValue* metadata, base::DictionaryValue* metadata,
bool has_more) { bool has_more) {
if (stream_.is_valid()) { if (stream_.is_valid()) {
mojo::BlockingCopyFromString(json, stream_); mojo::BlockingCopyFromString(*json, stream_);
} }
if (!has_more) { if (!has_more) {
......
...@@ -54,15 +54,16 @@ class TrackEventJsonExporterTest : public testing::Test { ...@@ -54,15 +54,16 @@ class TrackEventJsonExporterTest : public testing::Test {
void TearDown() override { json_trace_exporter_.reset(); } void TearDown() override { json_trace_exporter_.reset(); }
void OnTraceEventJson(const std::string& json, void OnTraceEventJson(std::string* json,
base::DictionaryValue* metadata, base::DictionaryValue* metadata,
bool has_more) { bool has_more) {
CHECK(!has_more); CHECK(!has_more);
unparsed_trace_data_ = json; unparsed_trace_data_.swap(*json);
parsed_trace_data_ = parsed_trace_data_ = base::DictionaryValue::From(
base::DictionaryValue::From(base::JSONReader::ReadDeprecated(json)); base::JSONReader::ReadDeprecated(unparsed_trace_data_));
ASSERT_TRUE(parsed_trace_data_) << "Couldn't parse json: \n" << json; ASSERT_TRUE(parsed_trace_data_) << "Couldn't parse json: \n"
<< unparsed_trace_data_;
// The TraceAnalyzer expects the raw trace output, without the // The TraceAnalyzer expects the raw trace output, without the
// wrapping root-node. // wrapping root-node.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment