Commit bb3210d3 authored by Robert Sesek's avatar Robert Sesek Committed by Commit Bot

Reland "Reimplement base::WaitableEvent with a kqueue on Mac."

This is a reland of 2096391c
Original change's description:
> Reimplement base::WaitableEvent with a kqueue on Mac.
> 
> For a single WaitableEvent, a custom EVFILT_USER kevent is used to wait and
> signal. This replaces the default POSIX implementation that uses a
> pthread_cond_t and a boolean flag.
> 
> To implement WaitMany, a new kqueue is created to wait on all the individual
> WaitableEvent's kqueue descriptor. This replaces a complex locking algorithm
> used in the default POSIX implementation.
> 
> For the asynchronous WaitableEventWatcher, a TYPE_READ dispatch_source_t is
> used to watch the WaitableEvent's kqueue. This replaces the POSIX
> implementation of a reference-counted list of async watchers guarded by a lock.
> 
> Microbenchmarks show that the kqueue implementation is significantly faster in
> most cases. The one potential drawback is hitting the low RLIMIT_NOFILE on
> macOS, since each WaitableEvent and WaitableEventWatcher requires a new
> descriptor.
> 
> Bug: 681167
> Change-Id: I135012fdd25e547ffb911fc7adc97c203df38241
> Reviewed-on: https://chromium-review.googlesource.com/553497
> Reviewed-by: Robert Liao <robliao@chromium.org>
> Reviewed-by: Mark Mentovai <mark@chromium.org>
> Commit-Queue: Robert Sesek <rsesek@chromium.org>
> Cr-Commit-Position: refs/heads/master@{#485788}

Bug: 681167
Change-Id: I1b0296022f7bd1a31b074fbbb551bccc3b28d05b
Reviewed-on: https://chromium-review.googlesource.com/568642Reviewed-by: default avatarMark Mentovai <mark@chromium.org>
Commit-Queue: Robert Sesek <rsesek@chromium.org>
Cr-Commit-Position: refs/heads/master@{#486449}
parent c94fbe29
...@@ -777,8 +777,10 @@ component("base") { ...@@ -777,8 +777,10 @@ component("base") {
"synchronization/read_write_lock_win.cc", "synchronization/read_write_lock_win.cc",
"synchronization/spin_wait.h", "synchronization/spin_wait.h",
"synchronization/waitable_event.h", "synchronization/waitable_event.h",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_posix.cc", "synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher.h", "synchronization/waitable_event_watcher.h",
"synchronization/waitable_event_watcher_mac.cc",
"synchronization/waitable_event_watcher_posix.cc", "synchronization/waitable_event_watcher_posix.cc",
"synchronization/waitable_event_watcher_win.cc", "synchronization/waitable_event_watcher_win.cc",
"synchronization/waitable_event_win.cc", "synchronization/waitable_event_win.cc",
...@@ -1514,6 +1516,8 @@ component("base") { ...@@ -1514,6 +1516,8 @@ component("base") {
"memory/shared_memory_posix.cc", "memory/shared_memory_posix.cc",
"native_library_posix.cc", "native_library_posix.cc",
"strings/sys_string_conversions_posix.cc", "strings/sys_string_conversions_posix.cc",
"synchronization/waitable_event_posix.cc",
"synchronization/waitable_event_watcher_posix.cc",
"threading/platform_thread_internal_posix.cc", "threading/platform_thread_internal_posix.cc",
] ]
...@@ -1640,6 +1644,8 @@ component("base") { ...@@ -1640,6 +1644,8 @@ component("base") {
"power_monitor/power_monitor_device_source_ios.mm", "power_monitor/power_monitor_device_source_ios.mm",
"process/memory_stubs.cc", "process/memory_stubs.cc",
"strings/sys_string_conversions_mac.mm", "strings/sys_string_conversions_mac.mm",
"synchronization/waitable_event_mac.cc",
"synchronization/waitable_event_watcher_mac.cc",
"threading/platform_thread_mac.mm", "threading/platform_thread_mac.mm",
"time/time_conversion_posix.cc", "time/time_conversion_posix.cc",
"time/time_mac.cc", "time/time_mac.cc",
......
...@@ -13,11 +13,12 @@ ...@@ -13,11 +13,12 @@
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/win/scoped_handle.h" #include "base/win/scoped_handle.h"
#endif #elif defined(OS_MACOSX)
#include "base/files/scoped_file.h"
#if defined(OS_POSIX) #elif defined(OS_POSIX)
#include <list> #include <list>
#include <utility> #include <utility>
#include "base/memory/ref_counted.h" #include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h" #include "base/synchronization/lock.h"
#endif #endif
...@@ -154,6 +155,14 @@ class BASE_EXPORT WaitableEvent { ...@@ -154,6 +155,14 @@ class BASE_EXPORT WaitableEvent {
#if defined(OS_WIN) #if defined(OS_WIN)
win::ScopedHandle handle_; win::ScopedHandle handle_;
#elif defined(OS_MACOSX)
// The kqueue used to signal and wait on a custom user event.
ScopedFD kqueue_;
// Creates a kevent64_s, filling in the values using EV_SET64() with the
// specified flags and filter flags, and then submits it as a change to the
// |kqueue_|.
void PostEvent(uint16_t flags, uint32_t fflags);
#else #else
// On Windows, you must not close a HANDLE which is currently being waited on. // On Windows, you must not close a HANDLE which is currently being waited on.
// The MSDN documentation says that the resulting behaviour is 'undefined'. // The MSDN documentation says that the resulting behaviour is 'undefined'.
......
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event.h"
#include <sys/event.h>
#include <vector>
#include "base/debug/activity_tracker.h"
#include "base/logging.h"
#include "base/posix/eintr_wrapper.h"
#include "base/threading/thread_restrictions.h"
namespace base {
WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
InitialState initial_state)
: kqueue_(kqueue()) {
PCHECK(kqueue_.is_valid()) << "kqueue";
uint16_t flags = EV_ADD;
if (reset_policy == ResetPolicy::AUTOMATIC)
flags |= EV_CLEAR;
// The initial event registration.
PostEvent(flags, 0);
if (initial_state == InitialState::SIGNALED)
Signal();
}
WaitableEvent::~WaitableEvent() = default;
void WaitableEvent::Reset() {
PostEvent(EV_DISABLE, 0);
}
void WaitableEvent::Signal() {
PostEvent(EV_ENABLE, NOTE_TRIGGER);
}
bool WaitableEvent::IsSignaled() {
// TODO(rsesek): Use KEVENT_FLAG_IMMEDIATE rather than an empty timeout.
timespec ts{};
kevent64_s event;
int rv = kevent64(kqueue_.get(), nullptr, 0, &event, 1, 0, &ts);
PCHECK(rv >= 0) << "kevent64 IsSignaled";
return rv > 0;
}
void WaitableEvent::Wait() {
bool result = TimedWaitUntil(TimeTicks::Max());
DCHECK(result) << "TimedWait() should never fail with infinite timeout";
}
bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) {
return TimedWaitUntil(TimeTicks::Now() + wait_delta);
}
bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) {
ThreadRestrictions::AssertWaitAllowed();
// Record the event that this thread is blocking upon (for hang diagnosis).
debug::ScopedEventWaitActivity event_activity(this);
bool indefinite = end_time.is_max();
int rv = 0;
do {
TimeDelta wait_time = end_time - TimeTicks::Now();
if (wait_time < TimeDelta()) {
// A negative delta would be treated by the system as indefinite, but
// it needs to be treated as a poll instead.
wait_time = TimeDelta();
}
timespec timeout = wait_time.ToTimeSpec();
// This does not use HANDLE_EINTR, since retrying the syscall requires
// adjusting the timeout to account for time already waited.
kevent64_s event;
rv = kevent64(kqueue_.get(), nullptr, 0, &event, 1, 0,
indefinite ? nullptr : &timeout);
} while (rv < 0 && errno == EINTR);
PCHECK(rv >= 0) << "kevent64 TimedWait";
return rv > 0;
}
// static
size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
ThreadRestrictions::AssertWaitAllowed();
DCHECK(count) << "Cannot wait on no events";
// Record an event (the first) that this thread is blocking upon.
debug::ScopedEventWaitActivity event_activity(raw_waitables[0]);
std::vector<kevent64_s> events(count);
for (size_t i = 0; i < count; ++i) {
EV_SET64(&events[i], raw_waitables[i]->kqueue_.get(), EVFILT_READ,
EV_ADD | EV_CLEAR, 0, 0, i, 0, 0);
}
std::vector<kevent64_s> out_events(count);
ScopedFD wait_many(kqueue());
PCHECK(wait_many.is_valid()) << "kqueue WaitMany";
int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count,
out_events.data(), count, 0, nullptr));
PCHECK(rv > 0) << "kevent64: WaitMany";
size_t triggered = -1;
for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
// WaitMany should return the lowest index in |raw_waitables| that was
// triggered.
size_t index = static_cast<size_t>(out_events[i].udata);
triggered = std::min(triggered, index);
}
// The WaitMany kevent has identified which kqueue was signaled. Trigger
// a Wait on it to clear the event within WaitableEvent's kqueue. This
// will not block, since it has been triggered.
raw_waitables[triggered]->Wait();
return triggered;
}
void WaitableEvent::PostEvent(uint16_t flags, uint32_t fflags) {
kevent64_s event;
EV_SET64(&event, reinterpret_cast<uint64_t>(this), EVFILT_USER, flags, fflags,
0, 0, 0, 0);
int rv =
HANDLE_EINTR(kevent64(kqueue_.get(), &event, 1, nullptr, 0, 0, nullptr));
PCHECK(rv == 0) << "kevent64";
}
} // namespace base
...@@ -12,12 +12,21 @@ ...@@ -12,12 +12,21 @@
#if defined(OS_WIN) #if defined(OS_WIN)
#include "base/win/object_watcher.h" #include "base/win/object_watcher.h"
#include "base/win/scoped_handle.h" #include "base/win/scoped_handle.h"
#elif defined(OS_MACOSX)
#include <dispatch/dispatch.h>
#include "base/files/scoped_file.h"
#include "base/mac/scoped_dispatch_object.h"
#include "base/memory/weak_ptr.h"
#else #else
#include "base/callback.h"
#include "base/sequence_checker.h" #include "base/sequence_checker.h"
#include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event.h"
#endif #endif
#if !defined(OS_WIN)
#include "base/callback.h"
#endif
namespace base { namespace base {
class Flag; class Flag;
...@@ -101,6 +110,27 @@ class BASE_EXPORT WaitableEventWatcher ...@@ -101,6 +110,27 @@ class BASE_EXPORT WaitableEventWatcher
EventCallback callback_; EventCallback callback_;
WaitableEvent* event_ = nullptr; WaitableEvent* event_ = nullptr;
#elif defined(OS_MACOSX)
// Invokes the callback and resets the source. Must be called on the task
// runner on which StartWatching() was called.
void InvokeCallback();
// Closure bound to the event being watched. This will be is_null() if
// nothing is being watched.
OnceClosure callback_;
// A dup()'d descriptor of the kqueue used by the WaitableEvent being
// watched, or the invalid value if nothing is.
ScopedFD kqueue_;
// A TYPE_READ dispatch source on |kqueue_|. When a read event is delivered,
// the kqueue has an event pending, and the bound |callback_| will be
// invoked. This will be null if nothing is currently being watched.
ScopedDispatchObject<dispatch_source_t> source_;
// Used to vend a weak pointer for calling InvokeCallback() from the
// |source_| event handler.
WeakPtrFactory<WaitableEventWatcher> weak_ptr_factory_;
#else #else
// Instantiated in StartWatching(). Set before the callback runs. Reset in // Instantiated in StartWatching(). Set before the callback runs. Reset in
// StopWatching() or StartWatching(). // StopWatching() or StartWatching().
......
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event_watcher.h"
#include "base/bind.h"
#include "base/callback.h"
#include "base/synchronization/waitable_event.h"
#include "base/threading/sequenced_task_runner_handle.h"
namespace base {
WaitableEventWatcher::WaitableEventWatcher() : weak_ptr_factory_(this) {}
WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}
bool WaitableEventWatcher::StartWatching(WaitableEvent* event,
EventCallback callback) {
DCHECK(!source_ || dispatch_source_testcancel(source_));
kqueue_.reset(dup(event->kqueue_.get()));
if (!kqueue_.is_valid()) {
PLOG(ERROR) << "dup kqueue";
return false;
}
// Use the global concurrent queue here, since it is only used to thunk
// to the real callback on the target task runner.
source_.reset(dispatch_source_create(
DISPATCH_SOURCE_TYPE_READ, kqueue_.get(), 0,
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)));
callback_ = BindOnce(std::move(callback), event);
scoped_refptr<SequencedTaskRunner> task_runner =
SequencedTaskRunnerHandle::Get();
WeakPtr<WaitableEventWatcher> weak_this = weak_ptr_factory_.GetWeakPtr();
dispatch_source_set_event_handler(source_, ^{
// Cancel the source immediately, since libdispatch will continue to send
// events until the kqueue is drained.
dispatch_source_cancel(source_);
task_runner->PostTask(
FROM_HERE, BindOnce(&WaitableEventWatcher::InvokeCallback, weak_this));
});
dispatch_resume(source_);
return true;
}
void WaitableEventWatcher::StopWatching() {
callback_.Reset();
if (source_) {
dispatch_source_cancel(source_);
source_.reset();
}
kqueue_.reset();
}
void WaitableEventWatcher::InvokeCallback() {
// The callback can be null if StopWatching() is called between signaling
// and the |callback_| getting run on the target task runner.
if (callback_.is_null())
return;
source_.reset();
std::move(callback_).Run();
}
} // namespace base
...@@ -784,6 +784,8 @@ TEST_F(TaskSchedulerTaskTrackerTest, ...@@ -784,6 +784,8 @@ TEST_F(TaskSchedulerTaskTrackerTest,
for (const auto& thread : post_threads) for (const auto& thread : post_threads)
thread->Join(); thread->Join();
// Clean up unused Thread objects to avoid running out of system resources.
post_threads.clear();
// Call Shutdown() asynchronously. // Call Shutdown() asynchronously.
CallShutdownAsync(); CallShutdownAsync();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment