Commit 1be89cc4 authored by Wez's avatar Wez Committed by Commit Bot

Reland "Fix threading in FileDescriptorWatcher and MessagePumpLibevent tests."

This is a reland of 45df4f39
Original change's description:
> Fix threading in FileDescriptorWatcher and MessagePumpLibevent tests.
> 
> Changes to MessagePumpLibevent & tests:
> 1. Add an explicit wait for thread start-up to TestWatchingFromBadThread,
> which was not updated when the base::ThreadStart() call was changed to
> not wait for the thread startup to complete.
> 2. Reinstate TestWatchingFromBadThread on all libevent platforms.
> 3. Add sanity-check tests for readable/writable notifications.
> 4. Clean up some .get()s and add some DPLOGs.
> 
> Changes to FileDescriptorWatcher & tests:
> 1. Add a DLOG_IF() to StartWatching, to make it easier to spot
> WatchFileDescriptor failures. This can't be a DCHECK or CHECK, since the
> StartWatching operation is racey with file-descriptor close() on the
> calling thread.
> 2. Fix the test fixture to Stop() |other_thread_| to ensure it is done
> working before we tear down the test file descriptors.
> 3. Fix the WatchWritable test to request write notifications for the
> correct end of the pipe (since pipe()s are uni-directional).
> 
> Bug: 138845,740692
> 
> TBR=dcheng,fdoray
> 
> Change-Id: I86bdd100561fbc56c1e6291216cafc3168b185dc
> Reviewed-on: https://chromium-review.googlesource.com/565227
> Commit-Queue: Wez <wez@chromium.org>
> Reviewed-by: Daniel Cheng <dcheng@chromium.org>
> Reviewed-by: Wez <wez@chromium.org>
> Reviewed-by: Nico Weber <thakis@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#485822}

TBR: dcheng, thakis
Bug: 138845, 740692
Change-Id: Ibc6e48316351b311bfa06727964bbcf4cf5e699b
Reviewed-on: https://chromium-review.googlesource.com/566840Reviewed-by: default avatarWez <wez@chromium.org>
Commit-Queue: Wez <wez@chromium.org>
Cr-Commit-Position: refs/heads/master@{#485880}
parent d1af288f
...@@ -108,8 +108,13 @@ FileDescriptorWatcher::Controller::Watcher::~Watcher() { ...@@ -108,8 +108,13 @@ FileDescriptorWatcher::Controller::Watcher::~Watcher() {
void FileDescriptorWatcher::Controller::Watcher::StartWatching() { void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
DCHECK(thread_checker_.CalledOnValidThread()); DCHECK(thread_checker_.CalledOnValidThread());
MessageLoopForIO::current()->WatchFileDescriptor( if (!MessageLoopForIO::current()->WatchFileDescriptor(
fd_, false, mode_, &file_descriptor_watcher_, this); fd_, false, mode_, &file_descriptor_watcher_, this)) {
// TODO(wez): Ideally we would [D]CHECK here, or propagate the failure back
// to the caller, but there is no guarantee that they haven't already
// closed |fd_| on another thread, so the best we can do is Debug-log.
DLOG(ERROR) << "Failed to watch fd=" << fd_;
}
if (!registered_as_destruction_observer_) { if (!registered_as_destruction_observer_) {
MessageLoopForIO::current()->AddDestructionObserver(this); MessageLoopForIO::current()->AddDestructionObserver(this);
......
...@@ -81,6 +81,9 @@ class FileDescriptorWatcherTest ...@@ -81,6 +81,9 @@ class FileDescriptorWatcherTest
base::RunLoop().RunUntilIdle(); base::RunLoop().RunUntilIdle();
} }
// Ensure that OtherThread is done processing before closing fds.
other_thread_.Stop();
EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[0]))); EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[0])));
EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[1]))); EXPECT_EQ(0, IGNORE_EINTR(close(pipe_fds_[1])));
} }
...@@ -116,7 +119,7 @@ class FileDescriptorWatcherTest ...@@ -116,7 +119,7 @@ class FileDescriptorWatcherTest
std::unique_ptr<FileDescriptorWatcher::Controller> WatchWritable() { std::unique_ptr<FileDescriptorWatcher::Controller> WatchWritable() {
std::unique_ptr<FileDescriptorWatcher::Controller> controller = std::unique_ptr<FileDescriptorWatcher::Controller> controller =
FileDescriptorWatcher::WatchWritable( FileDescriptorWatcher::WatchWritable(
read_file_descriptor(), write_file_descriptor(),
Bind(&Mock::WritableCallback, Unretained(&mock_))); Bind(&Mock::WritableCallback, Unretained(&mock_)));
EXPECT_TRUE(controller); EXPECT_TRUE(controller);
return controller; return controller;
...@@ -167,14 +170,11 @@ class FileDescriptorWatcherTest ...@@ -167,14 +170,11 @@ class FileDescriptorWatcherTest
TEST_P(FileDescriptorWatcherTest, WatchWritable) { TEST_P(FileDescriptorWatcherTest, WatchWritable) {
auto controller = WatchWritable(); auto controller = WatchWritable();
// On Mac and iOS, the write end of a newly created pipe is writable without // The write end of a newly created pipe is immediately writable.
// blocking.
#if defined(OS_MACOSX)
RunLoop run_loop; RunLoop run_loop;
EXPECT_CALL(mock_, WritableCallback()) EXPECT_CALL(mock_, WritableCallback())
.WillOnce(testing::Invoke(&run_loop, &RunLoop::Quit)); .WillOnce(testing::Invoke(&run_loop, &RunLoop::Quit));
run_loop.Run(); run_loop.Run();
#endif // defined(OS_MACOSX)
} }
TEST_P(FileDescriptorWatcherTest, WatchReadableOneByte) { TEST_P(FileDescriptorWatcherTest, WatchReadableOneByte) {
......
...@@ -153,13 +153,12 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, ...@@ -153,13 +153,12 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd,
} }
std::unique_ptr<event> evt(controller->ReleaseEvent()); std::unique_ptr<event> evt(controller->ReleaseEvent());
if (evt.get() == NULL) { if (!evt) {
// Ownership is transferred to the controller. // Ownership is transferred to the controller.
evt.reset(new event); evt.reset(new event);
} else { } else {
// Make sure we don't pick up any funky internal libevent masks. // Make sure we don't pick up any funky internal libevent masks.
int old_interest_mask = evt.get()->ev_events & int old_interest_mask = evt->ev_events & (EV_READ | EV_WRITE | EV_PERSIST);
(EV_READ | EV_WRITE | EV_PERSIST);
// Combine old/new event masks. // Combine old/new event masks.
event_mask |= old_interest_mask; event_mask |= old_interest_mask;
...@@ -180,11 +179,13 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd, ...@@ -180,11 +179,13 @@ bool MessagePumpLibevent::WatchFileDescriptor(int fd,
// Tell libevent which message pump this socket will belong to when we add it. // Tell libevent which message pump this socket will belong to when we add it.
if (event_base_set(event_base_, evt.get())) { if (event_base_set(event_base_, evt.get())) {
DPLOG(ERROR) << "event_base_set(fd=" << EVENT_FD(evt.get()) << ")";
return false; return false;
} }
// Add this socket to the list of monitored sockets. // Add this socket to the list of monitored sockets.
if (event_add(evt.get(), NULL)) { if (event_add(evt.get(), NULL)) {
DPLOG(ERROR) << "event_add failed(fd=" << EVENT_FD(evt.get()) << ")";
return false; return false;
} }
......
...@@ -50,6 +50,10 @@ class MessagePumpLibeventTest : public testing::Test { ...@@ -50,6 +50,10 @@ class MessagePumpLibeventTest : public testing::Test {
PLOG(ERROR) << "close"; PLOG(ERROR) << "close";
} }
void WaitUntilIoThreadStarted() {
ASSERT_TRUE(io_thread_.WaitUntilThreadStarted());
}
MessageLoopForIO* io_loop() const { MessageLoopForIO* io_loop() const {
return static_cast<MessageLoopForIO*>(io_thread_.message_loop()); return static_cast<MessageLoopForIO*>(io_thread_.message_loop());
} }
...@@ -82,16 +86,14 @@ class StupidWatcher : public MessagePumpLibevent::Watcher { ...@@ -82,16 +86,14 @@ class StupidWatcher : public MessagePumpLibevent::Watcher {
// Test to make sure that we catch calling WatchFileDescriptor off of the // Test to make sure that we catch calling WatchFileDescriptor off of the
// wrong thread. // wrong thread.
#if defined(OS_CHROMEOS) || defined(OS_LINUX) TEST_F(MessagePumpLibeventTest, TestWatchingFromBadThread) {
// Flaky on Chrome OS and Linux: crbug.com/138845.
#define MAYBE_TestWatchingFromBadThread DISABLED_TestWatchingFromBadThread
#else
#define MAYBE_TestWatchingFromBadThread TestWatchingFromBadThread
#endif
TEST_F(MessagePumpLibeventTest, MAYBE_TestWatchingFromBadThread) {
MessagePumpLibevent::FileDescriptorWatcher watcher(FROM_HERE); MessagePumpLibevent::FileDescriptorWatcher watcher(FROM_HERE);
StupidWatcher delegate; StupidWatcher delegate;
// Ensure that |io_thread_| has started, otherwise we're racing against
// creation of the thread's MessagePump.
WaitUntilIoThreadStarted();
ASSERT_DCHECK_DEATH( ASSERT_DCHECK_DEATH(
io_loop()->WatchFileDescriptor(STDOUT_FILENO, false, io_loop()->WatchFileDescriptor(STDOUT_FILENO, false,
MessageLoopForIO::WATCH_READ, &watcher, MessageLoopForIO::WATCH_READ, &watcher,
...@@ -213,20 +215,18 @@ void FatalClosure() { ...@@ -213,20 +215,18 @@ void FatalClosure() {
class QuitWatcher : public BaseWatcher { class QuitWatcher : public BaseWatcher {
public: public:
QuitWatcher(MessagePumpLibevent::FileDescriptorWatcher* controller, QuitWatcher(MessagePumpLibevent::FileDescriptorWatcher* controller,
RunLoop* run_loop) base::Closure quit_closure)
: BaseWatcher(controller), run_loop_(run_loop) {} : BaseWatcher(controller), quit_closure_(std::move(quit_closure)) {}
~QuitWatcher() override {}
void OnFileCanReadWithoutBlocking(int /* fd */) override { void OnFileCanReadWithoutBlocking(int /* fd */) override {
// Post a fatal closure to the MessageLoop before we quit it. // Post a fatal closure to the MessageLoop before we quit it.
ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, BindOnce(&FatalClosure)); ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE, BindOnce(&FatalClosure));
// Now quit the MessageLoop. quit_closure_.Run();
run_loop_->Quit();
} }
private: private:
RunLoop* run_loop_; // weak base::Closure quit_closure_;
}; };
void WriteFDWrapper(const int fd, void WriteFDWrapper(const int fd,
...@@ -246,7 +246,7 @@ TEST_F(MessagePumpLibeventTest, QuitWatcher) { ...@@ -246,7 +246,7 @@ TEST_F(MessagePumpLibeventTest, QuitWatcher) {
MessageLoop loop(WrapUnique(pump)); MessageLoop loop(WrapUnique(pump));
RunLoop run_loop; RunLoop run_loop;
MessagePumpLibevent::FileDescriptorWatcher controller(FROM_HERE); MessagePumpLibevent::FileDescriptorWatcher controller(FROM_HERE);
QuitWatcher delegate(&controller, &run_loop); QuitWatcher delegate(&controller, run_loop.QuitClosure());
WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC, WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
WaitableEvent::InitialState::NOT_SIGNALED); WaitableEvent::InitialState::NOT_SIGNALED);
std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher); std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
...@@ -277,6 +277,87 @@ TEST_F(MessagePumpLibeventTest, QuitWatcher) { ...@@ -277,6 +277,87 @@ TEST_F(MessagePumpLibeventTest, QuitWatcher) {
BindOnce(&WaitableEventWatcher::StopWatching, Owned(watcher.release()))); BindOnce(&WaitableEventWatcher::StopWatching, Owned(watcher.release())));
} }
class CaptureAndQuitWatcher : public BaseWatcher {
public:
CaptureAndQuitWatcher(MessagePumpLibevent::FileDescriptorWatcher* controller,
base::Closure quit_closure)
: BaseWatcher(controller), quit_closure_(std::move(quit_closure)) {}
void OnFileCanReadWithoutBlocking(int /* fd */) override {
is_readable_ = true;
quit_closure_.Run();
}
void OnFileCanWriteWithoutBlocking(int /* fd */) override {
is_writable_ = true;
quit_closure_.Run();
}
bool is_readable_ = false;
bool is_writable_ = false;
private:
base::Closure quit_closure_;
};
// Verify that basic readable notification works.
TEST_F(MessagePumpLibeventTest, WatchReadable) {
// Tear-down the old MessageLoop before creating the replacement.
ui_loop_.reset();
ui_loop_ = base::MakeUnique<MessageLoopForIO>();
MessagePumpLibevent::FileDescriptorWatcher watcher(FROM_HERE);
RunLoop run_loop;
CaptureAndQuitWatcher delegate(&watcher, run_loop.QuitClosure());
// Watch the pipe for readability.
ASSERT_TRUE(MessageLoopForIO::current()->WatchFileDescriptor(
pipefds_[0], /* persistent= */ false, MessageLoopForIO::WATCH_READ,
&watcher, &delegate));
// The pipe should not be readable when first created.
base::RunLoop().RunUntilIdle();
ASSERT_FALSE(delegate.is_readable_);
ASSERT_FALSE(delegate.is_writable_);
// Write a byte to the other end, making it readable.
const char buf = 0;
ASSERT_TRUE(WriteFileDescriptor(pipefds_[1], &buf, sizeof(buf)));
// We don't want to assume that the read fd becomes readable the
// instant a bytes is written, so Run until quit by an event.
run_loop.Run();
ASSERT_TRUE(delegate.is_readable_);
ASSERT_FALSE(delegate.is_writable_);
}
// Verify that watching a file descriptor for writability succeeds.
TEST_F(MessagePumpLibeventTest, WatchWritable) {
// Tear-down the old MessageLoop before creating the replacement.
ui_loop_.reset();
ui_loop_ = base::MakeUnique<MessageLoopForIO>();
MessagePumpLibevent::FileDescriptorWatcher watcher(FROM_HERE);
RunLoop run_loop;
CaptureAndQuitWatcher delegate(&watcher, run_loop.QuitClosure());
// Watch the pipe for writability.
ASSERT_TRUE(MessageLoopForIO::current()->WatchFileDescriptor(
pipefds_[1], /* persistent= */ false, MessageLoopForIO::WATCH_WRITE,
&watcher, &delegate));
// We should not receive a writable notification until we process events.
ASSERT_FALSE(delegate.is_readable_);
ASSERT_FALSE(delegate.is_writable_);
// The pipe should be writable immediately, so no need to wait for
// the quit closure.
run_loop.RunUntilIdle();
ASSERT_FALSE(delegate.is_readable_);
ASSERT_TRUE(delegate.is_writable_);
}
} // namespace } // namespace
} // namespace base } // namespace base
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment