Commit 446a8163 authored by Yury Semikhatsky's avatar Yury Semikhatsky Committed by Commit Bot

[DevTools] Do not print pipe I/O errors when shutting down

When browser shutdown started pipe file descriptors are closed
on the UI thread while there may be outstanding reads and writes
on parallel threads. If such read/write operations fail error
messages will only be written to the console if pipe shutdown
has not begun yet.

Bug: none
Change-Id: I57e9b6c7f2921335cc2bb1f5289bd9347bba1f68
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2108825Reviewed-by: default avatarAndrey Kosyakov <caseq@chromium.org>
Reviewed-by: default avatarPavel Feldman <pfeldman@chromium.org>
Reviewed-by: default avatarDmitry Gozman <dgozman@chromium.org>
Commit-Queue: Yury Semikhatsky <yurys@chromium.org>
Cr-Commit-Position: refs/heads/master@{#751605}
parent 9dbabf15
......@@ -25,6 +25,7 @@
#include "base/sequenced_task_runner.h"
#include "base/single_thread_task_runner.h"
#include "base/strings/string_util.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/post_task.h"
#include "base/threading/thread.h"
#include "build/build_config.h"
......@@ -48,11 +49,58 @@ const int kWriteFD = 4;
// third_party/inspector_protocol/crdtp/cbor.h.
namespace content {
class PipeReaderBase {
namespace {
class PipeIOBase {
public:
explicit PipeIOBase(const char* thread_name)
: thread_(new base::Thread(thread_name)) {}
virtual ~PipeIOBase() = default;
bool Start() {
base::Thread::Options options;
options.message_pump_type = base::MessagePumpType::IO;
if (!thread_->StartWithOptions(options))
return false;
StartMainLoop();
return true;
}
static void Shutdown(std::unique_ptr<PipeIOBase> pipe_io) {
if (!pipe_io)
return;
auto thread = std::move(pipe_io->thread_);
pipe_io->shutting_down_.Set();
pipe_io->ClosePipe();
// Post self destruction on the custom thread if it's running.
if (thread->task_runner()) {
thread->task_runner()->DeleteSoon(FROM_HERE, std::move(pipe_io));
} else {
pipe_io.reset();
}
// Post background task that would join and destroy the thread.
base::DeleteSoon(
FROM_HERE,
{base::ThreadPool(), base::MayBlock(), base::WithBaseSyncPrimitives(),
base::TaskPriority::BEST_EFFORT},
std::move(thread));
}
protected:
virtual void StartMainLoop() {}
virtual void ClosePipe() = 0;
std::unique_ptr<base::Thread> thread_;
base::AtomicFlag shutting_down_;
};
} // namespace
class PipeReaderBase : public PipeIOBase {
public:
explicit PipeReaderBase(base::WeakPtr<DevToolsPipeHandler> devtools_handler,
int read_fd)
: devtools_handler_(std::move(devtools_handler)) {
PipeReaderBase(base::WeakPtr<DevToolsPipeHandler> devtools_handler,
int read_fd)
: PipeIOBase("DevToolsPipeHandlerReadThread"),
devtools_handler_(std::move(devtools_handler)) {
#if defined(OS_WIN)
read_handle_ = reinterpret_cast<HANDLE>(_get_osfhandle(read_fd));
#else
......@@ -60,16 +108,24 @@ class PipeReaderBase {
#endif
}
virtual ~PipeReaderBase() = default;
protected:
void StartMainLoop() override {
thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&PipeReaderBase::ReadLoop, base::Unretained(this)));
}
void ReadLoop() {
ReadLoopInternal();
base::PostTask(
FROM_HERE, {BrowserThread::UI},
base::BindOnce(&DevToolsPipeHandler::Shutdown, devtools_handler_));
void ClosePipe() override {
// Concurrently discard the pipe handles to successfully join threads.
#if defined(OS_WIN)
// Cancel pending synchronous read.
CancelIoEx(read_handle_, nullptr);
CloseHandle(read_handle_);
#else
shutdown(read_fd_, SHUT_RDWR);
#endif
}
protected:
virtual void ReadLoopInternal() = 0;
size_t ReadBytes(void* buffer, size_t size, bool exact_size) {
......@@ -88,7 +144,8 @@ class PipeReaderBase {
bool had_error = size_read <= 0;
#endif
if (had_error) {
LOG(ERROR) << "Connection terminated while reading from pipe";
if (!shutting_down_.IsSet())
LOG(ERROR) << "Connection terminated while reading from pipe";
return 0;
}
bytes_read += size_read;
......@@ -104,7 +161,14 @@ class PipeReaderBase {
devtools_handler_, std::move(message)));
}
protected:
private:
void ReadLoop() {
ReadLoopInternal();
base::PostTask(
FROM_HERE, {BrowserThread::UI},
base::BindOnce(&DevToolsPipeHandler::Shutdown, devtools_handler_));
}
base::WeakPtr<DevToolsPipeHandler> devtools_handler_;
#if defined(OS_WIN)
HANDLE read_handle_;
......@@ -113,52 +177,91 @@ class PipeReaderBase {
#endif
};
namespace {
class PipeWriterBase : public PipeIOBase {
public:
explicit PipeWriterBase(int write_fd)
: PipeIOBase("DevToolsPipeHandlerWriteThread") {
#if defined(OS_WIN)
write_handle_ = reinterpret_cast<HANDLE>(_get_osfhandle(write_fd));
#else
write_fd_ = write_fd;
#endif
}
const char kDevToolsPipeHandlerReadThreadName[] =
"DevToolsPipeHandlerReadThread";
const char kDevToolsPipeHandlerWriteThreadName[] =
"DevToolsPipeHandlerWriteThread";
void Write(base::span<const uint8_t> message) {
base::TaskRunner* task_runner = thread_->task_runner().get();
task_runner->PostTask(
FROM_HERE,
base::BindOnce(&PipeWriterBase::WriteIntoPipe, base::Unretained(this),
std::string(message.begin(), message.end())));
}
void WriteBytes(int write_fd, const char* bytes, size_t size) {
protected:
void ClosePipe() override {
#if defined(OS_WIN)
HANDLE handle = reinterpret_cast<HANDLE>(_get_osfhandle(write_fd));
CloseHandle(write_handle_);
#else
shutdown(write_fd_, SHUT_RDWR);
#endif
}
virtual void WriteIntoPipe(std::string message) = 0;
size_t total_written = 0;
while (total_written < size) {
size_t length = size - total_written;
if (length > kWritePacketSize)
length = kWritePacketSize;
void WriteBytes(const char* bytes, size_t size) {
size_t total_written = 0;
while (total_written < size) {
size_t length = size - total_written;
if (length > kWritePacketSize)
length = kWritePacketSize;
#if defined(OS_WIN)
DWORD bytes_written = 0;
bool had_error =
!WriteFile(handle, bytes + total_written, static_cast<DWORD>(length),
&bytes_written, nullptr);
DWORD bytes_written = 0;
bool had_error =
!WriteFile(write_handle_, bytes + total_written,
static_cast<DWORD>(length), &bytes_written, nullptr);
#else
int bytes_written = write(write_fd, bytes + total_written, length);
if (bytes_written < 0 && errno == EINTR)
continue;
bool had_error = bytes_written <= 0;
int bytes_written = write(write_fd_, bytes + total_written, length);
if (bytes_written < 0 && errno == EINTR)
continue;
bool had_error = bytes_written <= 0;
#endif
if (had_error) {
LOG(ERROR) << "Could not write into pipe";
return;
if (had_error) {
if (!shutting_down_.IsSet())
LOG(ERROR) << "Could not write into pipe";
return;
}
total_written += bytes_written;
}
total_written += bytes_written;
}
}
void WriteIntoPipeASCIIZ(int write_fd, std::string message) {
WriteBytes(write_fd, message.data(), message.size());
WriteBytes(write_fd, "\0", 1);
}
private:
#if defined(OS_WIN)
HANDLE write_handle_;
#else
int write_fd_;
#endif
};
void WriteIntoPipeCBOR(int write_fd, std::string message) {
DCHECK(crdtp::cbor::IsCBORMessage(crdtp::SpanFrom(message)));
namespace {
WriteBytes(write_fd, message.data(), message.size());
}
class PipeWriterASCIIZ : public PipeWriterBase {
public:
explicit PipeWriterASCIIZ(int write_fd) : PipeWriterBase(write_fd) {}
void WriteIntoPipe(std::string message) override {
WriteBytes(message.data(), message.size());
WriteBytes("\0", 1);
}
};
class PipeWriterCBOR : public PipeWriterBase {
public:
explicit PipeWriterCBOR(int write_fd) : PipeWriterBase(write_fd) {}
void WriteIntoPipe(std::string message) override {
DCHECK(crdtp::cbor::IsCBORMessage(crdtp::SpanFrom(message)));
WriteBytes(message.data(), message.size());
}
};
class PipeReaderASCIIZ : public PipeReaderBase {
public:
......@@ -242,22 +345,6 @@ class PipeReaderCBOR : public PipeReaderBase {
DevToolsPipeHandler::DevToolsPipeHandler()
: read_fd_(kReadFD), write_fd_(kWriteFD) {
read_thread_.reset(new base::Thread(kDevToolsPipeHandlerReadThreadName));
base::Thread::Options options;
options.message_pump_type = base::MessagePumpType::IO;
if (!read_thread_->StartWithOptions(options)) {
read_thread_.reset();
Shutdown();
return;
}
write_thread_.reset(new base::Thread(kDevToolsPipeHandlerWriteThreadName));
if (!write_thread_->StartWithOptions(options)) {
write_thread_.reset();
Shutdown();
return;
}
browser_target_ = DevToolsAgentHost::CreateForBrowser(
nullptr, DevToolsAgentHost::CreateServerSocketCallback());
browser_target_->AttachClient(this);
......@@ -272,70 +359,31 @@ DevToolsPipeHandler::DevToolsPipeHandler()
case ProtocolMode::kASCIIZ:
pipe_reader_ = std::make_unique<PipeReaderASCIIZ>(
weak_factory_.GetWeakPtr(), read_fd_);
pipe_writer_ = std::make_unique<PipeWriterASCIIZ>(write_fd_);
break;
case ProtocolMode::kCBOR:
pipe_reader_ = std::make_unique<PipeReaderCBOR>(
weak_factory_.GetWeakPtr(), read_fd_);
pipe_writer_ = std::make_unique<PipeWriterCBOR>(write_fd_);
break;
}
read_thread_->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&PipeReaderBase::ReadLoop,
base::Unretained(pipe_reader_.get())));
if (!pipe_reader_->Start() || !pipe_writer_->Start())
Shutdown();
}
void DevToolsPipeHandler::Shutdown() {
if (shutting_down_)
return;
shutting_down_ = true;
// Is there is no read thread, there is nothing, it is safe to proceed.
if (!read_thread_)
return;
// If there is no write thread, only take care of the read thread.
if (!write_thread_) {
base::ThreadPool::PostTask(
FROM_HERE,
{base::MayBlock(), base::WithBaseSyncPrimitives(),
base::TaskPriority::BEST_EFFORT},
base::BindOnce([](base::Thread* rthread) { delete rthread; },
read_thread_.release()));
return;
}
// There were threads, disconnect from the target.
// Disconnect from the target.
DCHECK(browser_target_);
browser_target_->DetachClient(this);
browser_target_ = nullptr;
// Concurrently discard the pipe handles to successfully join threads.
#if defined(OS_WIN)
HANDLE read_handle = reinterpret_cast<HANDLE>(_get_osfhandle(read_fd_));
// Cancel pending synchronous read.
CancelIoEx(read_handle, NULL);
CloseHandle(read_handle);
CloseHandle(reinterpret_cast<HANDLE>(_get_osfhandle(write_fd_)));
#else
shutdown(read_fd_, SHUT_RDWR);
shutdown(write_fd_, SHUT_RDWR);
#endif
// Post PipeReader and WeakPtr factory destruction on the reader thread.
read_thread_->task_runner()->PostTask(
FROM_HERE, base::BindOnce([](PipeReaderBase* reader) { delete reader; },
pipe_reader_.release()));
// Post background task that would join and destroy the threads.
base::ThreadPool::PostTask(
FROM_HERE,
{base::MayBlock(), base::WithBaseSyncPrimitives(),
base::TaskPriority::BEST_EFFORT},
base::BindOnce(
[](base::Thread* rthread, base::Thread* wthread) {
delete rthread;
delete wthread;
},
read_thread_.release(), write_thread_.release()));
PipeIOBase::Shutdown(std::move(pipe_reader_));
PipeIOBase::Shutdown(std::move(pipe_writer_));
}
DevToolsPipeHandler::~DevToolsPipeHandler() {
......@@ -352,14 +400,8 @@ void DevToolsPipeHandler::DetachFromTarget() {}
void DevToolsPipeHandler::DispatchProtocolMessage(
DevToolsAgentHost* agent_host,
base::span<const uint8_t> message) {
if (!write_thread_)
return;
base::TaskRunner* task_runner = write_thread_->task_runner().get();
task_runner->PostTask(
FROM_HERE,
base::BindOnce(mode_ == ProtocolMode::kASCIIZ ? WriteIntoPipeASCIIZ
: WriteIntoPipeCBOR,
write_fd_, std::string(message.begin(), message.end())));
if (pipe_writer_)
pipe_writer_->Write(message);
}
void DevToolsPipeHandler::AgentHostClosed(DevToolsAgentHost* agent_host) {}
......
......@@ -10,13 +10,10 @@
#include "base/memory/weak_ptr.h"
#include "content/public/browser/devtools_agent_host_client.h"
namespace base {
class Thread;
}
namespace content {
class PipeReaderBase;
class PipeWriterBase;
class DevToolsPipeHandler : public DevToolsAgentHostClient {
public:
......@@ -45,8 +42,7 @@ class DevToolsPipeHandler : public DevToolsAgentHostClient {
ProtocolMode mode_;
std::unique_ptr<PipeReaderBase> pipe_reader_;
std::unique_ptr<base::Thread> read_thread_;
std::unique_ptr<base::Thread> write_thread_;
std::unique_ptr<PipeWriterBase> pipe_writer_;
scoped_refptr<DevToolsAgentHost> browser_target_;
int read_fd_;
int write_fd_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment